meerschaum.utils.dataframe
Utility functions for working with DataFrames.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with DataFrames. 7""" 8 9from __future__ import annotations 10 11import pathlib 12from datetime import datetime, timezone 13from collections import defaultdict 14 15import meerschaum as mrsm 16from meerschaum.utils.typing import ( 17 Optional, Dict, Any, List, Hashable, Generator, 18 Iterator, Iterable, Union, TYPE_CHECKING, 19) 20 21if TYPE_CHECKING: 22 pd, dask = mrsm.attempt_import('pandas', 'dask') 23 24 25def add_missing_cols_to_df( 26 df: 'pd.DataFrame', 27 dtypes: Dict[str, Any], 28) -> 'pd.DataFrame': 29 """ 30 Add columns from the dtypes dictionary as null columns to a new DataFrame. 31 32 Parameters 33 ---------- 34 df: pd.DataFrame 35 The dataframe we should copy and add null columns. 36 37 dtypes: 38 The data types dictionary which may contain keys not present in `df.columns`. 39 40 Returns 41 ------- 42 A new `DataFrame` with the keys from `dtypes` added as null columns. 43 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 44 NOTE: This will not ensure that dtypes are enforced! 45 46 Examples 47 -------- 48 >>> import pandas as pd 49 >>> df = pd.DataFrame([{'a': 1}]) 50 >>> dtypes = {'b': 'Int64'} 51 >>> add_missing_cols_to_df(df, dtypes) 52 a b 53 0 1 <NA> 54 >>> add_missing_cols_to_df(df, dtypes).dtypes 55 a int64 56 b Int64 57 dtype: object 58 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 59 a int64 60 dtype: object 61 >>> 62 """ 63 if set(df.columns) == set(dtypes): 64 return df 65 66 from meerschaum.utils.packages import attempt_import 67 from meerschaum.utils.dtypes import to_pandas_dtype 68 pandas = attempt_import('pandas') 69 70 def build_series(dtype: str): 71 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 72 73 assign_kwargs = { 74 str(col): build_series(str(typ)) 75 for col, typ in dtypes.items() 76 if col not in df.columns 77 } 78 df_with_cols = df.assign(**assign_kwargs) 79 for col in assign_kwargs: 80 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 81 return df_with_cols 82 83 84def filter_unseen_df( 85 old_df: 'pd.DataFrame', 86 new_df: 'pd.DataFrame', 87 safe_copy: bool = True, 88 dtypes: Optional[Dict[str, Any]] = None, 89 include_unchanged_columns: bool = False, 90 coerce_mixed_numerics: bool = True, 91 debug: bool = False, 92) -> 'pd.DataFrame': 93 """ 94 Left join two DataFrames to find the newest unseen data. 95 96 Parameters 97 ---------- 98 old_df: 'pd.DataFrame' 99 The original (target) dataframe. Acts as a filter on the `new_df`. 100 101 new_df: 'pd.DataFrame' 102 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 103 104 safe_copy: bool, default True 105 If `True`, create a copy before comparing and modifying the dataframes. 106 Setting to `False` may mutate the DataFrames. 107 108 dtypes: Optional[Dict[str, Any]], default None 109 Optionally specify the datatypes of the dataframe. 110 111 include_unchanged_columns: bool, default False 112 If `True`, include columns which haven't changed on rows which have changed. 113 114 coerce_mixed_numerics: bool, default True 115 If `True`, cast mixed integer and float columns between the old and new dataframes into 116 numeric values (`decimal.Decimal`). 117 118 debug: bool, default False 119 Verbosity toggle. 120 121 Returns 122 ------- 123 A pandas dataframe of the new, unseen rows in `new_df`. 124 125 Examples 126 -------- 127 ```python 128 >>> import pandas as pd 129 >>> df1 = pd.DataFrame({'a': [1,2]}) 130 >>> df2 = pd.DataFrame({'a': [2,3]}) 131 >>> filter_unseen_df(df1, df2) 132 a 133 0 3 134 135 ``` 136 137 """ 138 if old_df is None: 139 return new_df 140 141 if safe_copy: 142 old_df = old_df.copy() 143 new_df = new_df.copy() 144 145 import json 146 import functools 147 import traceback 148 from meerschaum.utils.warnings import warn 149 from meerschaum.utils.packages import import_pandas, attempt_import 150 from meerschaum.utils.dtypes import ( 151 to_pandas_dtype, 152 are_dtypes_equal, 153 attempt_cast_to_numeric, 154 attempt_cast_to_uuid, 155 attempt_cast_to_bytes, 156 attempt_cast_to_geometry, 157 coerce_timezone, 158 serialize_decimal, 159 ) 160 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 161 pd = import_pandas(debug=debug) 162 is_dask = 'dask' in new_df.__module__ 163 if is_dask: 164 pandas = attempt_import('pandas') 165 _ = attempt_import('partd', lazy=False) 166 dd = attempt_import('dask.dataframe') 167 merge = dd.merge 168 NA = pandas.NA 169 else: 170 merge = pd.merge 171 NA = pd.NA 172 173 new_df_dtypes = dict(new_df.dtypes) 174 old_df_dtypes = dict(old_df.dtypes) 175 176 same_cols = set(new_df.columns) == set(old_df.columns) 177 if not same_cols: 178 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 179 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 180 181 new_types_missing_from_old = { 182 col: typ 183 for col, typ in new_df_dtypes.items() 184 if col not in old_df_dtypes 185 } 186 old_types_missing_from_new = { 187 col: typ 188 for col, typ in new_df_dtypes.items() 189 if col not in old_df_dtypes 190 } 191 old_df_dtypes.update(new_types_missing_from_old) 192 new_df_dtypes.update(old_types_missing_from_new) 193 194 ### Edge case: two empty lists cast to DFs. 195 elif len(new_df.columns) == 0: 196 return new_df 197 198 try: 199 ### Order matters when checking equality. 200 new_df = new_df[old_df.columns] 201 202 except Exception as e: 203 warn( 204 "Was not able to cast old columns onto new DataFrame. " + 205 f"Are both DataFrames the same shape? Error:\n{e}", 206 stacklevel=3, 207 ) 208 return new_df[list(new_df_dtypes.keys())] 209 210 ### assume the old_df knows what it's doing, even if it's technically wrong. 211 if dtypes is None: 212 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 213 214 dtypes = { 215 col: to_pandas_dtype(typ) 216 for col, typ in dtypes.items() 217 if col in new_df_dtypes and col in old_df_dtypes 218 } 219 for col, typ in new_df_dtypes.items(): 220 if col not in dtypes: 221 dtypes[col] = typ 222 223 numeric_cols_precisions_scales = { 224 col: get_numeric_precision_scale(None, typ) 225 for col, typ in dtypes.items() 226 if col and str(typ).lower().startswith('numeric') 227 } 228 229 dt_dtypes = { 230 col: typ 231 for col, typ in dtypes.items() 232 if are_dtypes_equal(typ, 'datetime') 233 } 234 non_dt_dtypes = { 235 col: typ 236 for col, typ in dtypes.items() 237 if col not in dt_dtypes 238 } 239 240 cast_non_dt_cols = True 241 try: 242 new_df = new_df.astype(non_dt_dtypes) 243 cast_non_dt_cols = False 244 except Exception as e: 245 warn( 246 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 247 ) 248 249 cast_dt_cols = True 250 try: 251 for col, typ in dt_dtypes.items(): 252 strip_utc = ( 253 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 254 ) 255 if col in old_df.columns: 256 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 257 if col in new_df.columns: 258 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 259 cast_dt_cols = False 260 except Exception as e: 261 warn(f"Could not cast datetime columns:\n{e}") 262 263 cast_cols = cast_dt_cols or cast_non_dt_cols 264 265 new_numeric_cols_existing = get_numeric_cols(new_df) 266 old_numeric_cols = get_numeric_cols(old_df) 267 for col, typ in {k: v for k, v in dtypes.items()}.items(): 268 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 269 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 270 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 271 new_is_numeric = col in new_numeric_cols_existing 272 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 273 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 274 old_is_numeric = col in old_numeric_cols 275 276 if ( 277 coerce_mixed_numerics 278 and 279 (new_is_float or new_is_int or new_is_numeric) 280 and 281 (old_is_float or old_is_int or old_is_numeric) 282 ): 283 dtypes[col] = attempt_cast_to_numeric 284 cast_cols = True 285 continue 286 287 ### Fallback to object if the types don't match. 288 warn( 289 f"Detected different types for '{col}' " 290 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 291 + "falling back to 'object'..." 292 ) 293 dtypes[col] = 'object' 294 cast_cols = True 295 296 if cast_cols: 297 for col, dtype in dtypes.items(): 298 if col in new_df.columns: 299 try: 300 new_df[col] = ( 301 new_df[col].astype(dtype) 302 if not callable(dtype) 303 else new_df[col].apply(dtype) 304 ) 305 except Exception as e: 306 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 307 308 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 309 new_json_cols = get_json_cols(new_df) 310 old_json_cols = get_json_cols(old_df) 311 json_cols = set(new_json_cols + old_json_cols) 312 for json_col in old_json_cols: 313 old_df[json_col] = old_df[json_col].apply(serializer) 314 for json_col in new_json_cols: 315 new_df[json_col] = new_df[json_col].apply(serializer) 316 317 new_numeric_cols = get_numeric_cols(new_df) 318 numeric_cols = set(new_numeric_cols + old_numeric_cols) 319 for numeric_col in old_numeric_cols: 320 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 321 for numeric_col in new_numeric_cols: 322 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 323 324 old_dt_cols = [ 325 col 326 for col, typ in old_df.dtypes.items() 327 if are_dtypes_equal(str(typ), 'datetime') 328 ] 329 for col in old_dt_cols: 330 strip_utc = ( 331 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 332 ) 333 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 334 335 new_dt_cols = [ 336 col 337 for col, typ in new_df.dtypes.items() 338 if are_dtypes_equal(str(typ), 'datetime') 339 ] 340 for col in new_dt_cols: 341 strip_utc = ( 342 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 343 ) 344 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 345 346 old_uuid_cols = get_uuid_cols(old_df) 347 new_uuid_cols = get_uuid_cols(new_df) 348 uuid_cols = set(new_uuid_cols + old_uuid_cols) 349 350 old_bytes_cols = get_bytes_cols(old_df) 351 new_bytes_cols = get_bytes_cols(new_df) 352 bytes_cols = set(new_bytes_cols + old_bytes_cols) 353 354 old_geometry_cols = get_geometry_cols(old_df) 355 new_geometry_cols = get_geometry_cols(new_df) 356 geometry_cols = set(new_geometry_cols + old_geometry_cols) 357 358 joined_df = merge( 359 new_df.infer_objects(copy=False).fillna(NA), 360 old_df.infer_objects(copy=False).fillna(NA), 361 how='left', 362 on=None, 363 indicator=True, 364 ) 365 changed_rows_mask = (joined_df['_merge'] == 'left_only') 366 new_cols = list(new_df_dtypes) 367 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 368 369 for json_col in json_cols: 370 if json_col not in delta_df.columns: 371 continue 372 try: 373 delta_df[json_col] = delta_df[json_col].apply(json.loads) 374 except Exception: 375 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 376 377 for numeric_col in numeric_cols: 378 if numeric_col not in delta_df.columns: 379 continue 380 try: 381 delta_df[numeric_col] = delta_df[numeric_col].apply( 382 functools.partial( 383 attempt_cast_to_numeric, 384 quantize=True, 385 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 386 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 387 ) 388 ) 389 except Exception: 390 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 391 392 for uuid_col in uuid_cols: 393 if uuid_col not in delta_df.columns: 394 continue 395 try: 396 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 397 except Exception: 398 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 399 400 for bytes_col in bytes_cols: 401 if bytes_col not in delta_df.columns: 402 continue 403 try: 404 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 405 except Exception: 406 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 407 408 for geometry_col in geometry_cols: 409 if geometry_col not in delta_df.columns: 410 continue 411 try: 412 delta_df[geometry_col] = delta_df[geometry_col].apply(attempt_cast_to_geometry) 413 except Exception: 414 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 415 416 return delta_df 417 418 419def parse_df_datetimes( 420 df: 'pd.DataFrame', 421 ignore_cols: Optional[Iterable[str]] = None, 422 strip_timezone: bool = False, 423 chunksize: Optional[int] = None, 424 dtype_backend: str = 'numpy_nullable', 425 ignore_all: bool = False, 426 debug: bool = False, 427) -> 'pd.DataFrame': 428 """ 429 Parse a pandas DataFrame for datetime columns and cast as datetimes. 430 431 Parameters 432 ---------- 433 df: pd.DataFrame 434 The pandas DataFrame to parse. 435 436 ignore_cols: Optional[Iterable[str]], default None 437 If provided, do not attempt to coerce these columns as datetimes. 438 439 strip_timezone: bool, default False 440 If `True`, remove the UTC `tzinfo` property. 441 442 chunksize: Optional[int], default None 443 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 444 445 dtype_backend: str, default 'numpy_nullable' 446 If `df` is not a DataFrame and new one needs to be constructed, 447 use this as the datatypes backend. 448 Accepted values are 'numpy_nullable' and 'pyarrow'. 449 450 ignore_all: bool, default False 451 If `True`, do not attempt to cast any columns to datetimes. 452 453 debug: bool, default False 454 Verbosity toggle. 455 456 Returns 457 ------- 458 A new pandas DataFrame with the determined datetime columns 459 (usually ISO strings) cast as datetimes. 460 461 Examples 462 -------- 463 ```python 464 >>> import pandas as pd 465 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 466 >>> df.dtypes 467 a object 468 dtype: object 469 >>> df = parse_df_datetimes(df) 470 >>> df.dtypes 471 a datetime64[ns] 472 dtype: object 473 474 ``` 475 476 """ 477 from meerschaum.utils.packages import import_pandas, attempt_import 478 from meerschaum.utils.debug import dprint 479 from meerschaum.utils.warnings import warn 480 from meerschaum.utils.misc import items_str 481 from meerschaum.utils.dtypes import to_datetime 482 import traceback 483 pd = import_pandas() 484 pandas = attempt_import('pandas') 485 pd_name = pd.__name__ 486 using_dask = 'dask' in pd_name 487 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 488 dask_dataframe = None 489 if using_dask or df_is_dask: 490 npartitions = chunksize_to_npartitions(chunksize) 491 dask_dataframe = attempt_import('dask.dataframe') 492 493 ### if df is a dict, build DataFrame 494 if isinstance(df, pandas.DataFrame): 495 pdf = df 496 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 497 pdf = get_first_valid_dask_partition(df) 498 else: 499 if debug: 500 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 501 502 if using_dask: 503 if isinstance(df, list): 504 keys = set() 505 for doc in df: 506 for key in doc: 507 keys.add(key) 508 df = pd.DataFrame.from_dict( 509 { 510 k: [ 511 doc.get(k, None) 512 for doc in df 513 ] for k in keys 514 }, 515 npartitions=npartitions, 516 ) 517 elif isinstance(df, dict): 518 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 519 elif 'pandas.core.frame.DataFrame' in str(type(df)): 520 df = pd.from_pandas(df, npartitions=npartitions) 521 else: 522 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 523 pandas = attempt_import('pandas') 524 pdf = get_first_valid_dask_partition(df) 525 526 else: 527 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 528 pdf = df 529 530 ### skip parsing if DataFrame is empty 531 if len(pdf) == 0: 532 if debug: 533 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 534 return df 535 536 ignore_cols = set( 537 (ignore_cols or []) + [ 538 col 539 for col, dtype in pdf.dtypes.items() 540 if 'datetime' in str(dtype) 541 ] 542 ) 543 cols_to_inspect = [ 544 col 545 for col in pdf.columns 546 if col not in ignore_cols 547 ] if not ignore_all else [] 548 549 if len(cols_to_inspect) == 0: 550 if debug: 551 dprint("All columns are ignored, skipping datetime detection...") 552 return df.infer_objects(copy=False).fillna(pandas.NA) 553 554 ### apply regex to columns to determine which are ISO datetimes 555 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 556 dt_mask = pdf[cols_to_inspect].astype(str).apply( 557 lambda s: s.str.match(iso_dt_regex).all() 558 ) 559 560 ### list of datetime column names 561 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 562 if not datetime_cols: 563 if debug: 564 dprint("No columns detected as datetimes, returning...") 565 return df.infer_objects(copy=False).fillna(pandas.NA) 566 567 if debug: 568 dprint("Converting columns to datetimes: " + str(datetime_cols)) 569 570 try: 571 if not using_dask: 572 df[datetime_cols] = df[datetime_cols].apply(to_datetime) 573 else: 574 df[datetime_cols] = df[datetime_cols].apply( 575 to_datetime, 576 utc=True, 577 axis=1, 578 meta={ 579 col: 'datetime64[ns, UTC]' 580 for col in datetime_cols 581 } 582 ) 583 except Exception: 584 warn( 585 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 586 + f"{traceback.format_exc()}" 587 ) 588 589 if strip_timezone: 590 for dt in datetime_cols: 591 try: 592 df[dt] = df[dt].dt.tz_localize(None) 593 except Exception: 594 warn( 595 f"Unable to convert column '{dt}' to naive datetime:\n" 596 + f"{traceback.format_exc()}" 597 ) 598 599 return df.fillna(pandas.NA) 600 601 602def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 603 """ 604 Get the columns which contain unhashable objects from a Pandas DataFrame. 605 606 Parameters 607 ---------- 608 df: pd.DataFrame 609 The DataFrame which may contain unhashable objects. 610 611 Returns 612 ------- 613 A list of columns. 614 """ 615 if df is None: 616 return [] 617 if len(df) == 0: 618 return [] 619 620 is_dask = 'dask' in df.__module__ 621 if is_dask: 622 from meerschaum.utils.packages import attempt_import 623 pandas = attempt_import('pandas') 624 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 625 return [ 626 col for col, val in df.iloc[0].items() 627 if not isinstance(val, Hashable) 628 ] 629 630 631def get_json_cols(df: 'pd.DataFrame') -> List[str]: 632 """ 633 Get the columns which contain unhashable objects from a Pandas DataFrame. 634 635 Parameters 636 ---------- 637 df: pd.DataFrame 638 The DataFrame which may contain unhashable objects. 639 640 Returns 641 ------- 642 A list of columns to be encoded as JSON. 643 """ 644 if df is None: 645 return [] 646 647 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 648 if is_dask: 649 df = get_first_valid_dask_partition(df) 650 651 if len(df) == 0: 652 return [] 653 654 cols_indices = { 655 col: df[col].first_valid_index() 656 for col in df.columns 657 } 658 return [ 659 col 660 for col, ix in cols_indices.items() 661 if ( 662 ix is not None 663 and 664 not isinstance(df.loc[ix][col], Hashable) 665 ) 666 ] 667 668 669def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 670 """ 671 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 672 673 Parameters 674 ---------- 675 df: pd.DataFrame 676 The DataFrame which may contain decimal objects. 677 678 Returns 679 ------- 680 A list of columns to treat as numerics. 681 """ 682 if df is None: 683 return [] 684 from decimal import Decimal 685 is_dask = 'dask' in df.__module__ 686 if is_dask: 687 df = get_first_valid_dask_partition(df) 688 689 if len(df) == 0: 690 return [] 691 692 cols_indices = { 693 col: df[col].first_valid_index() 694 for col in df.columns 695 } 696 return [ 697 col 698 for col, ix in cols_indices.items() 699 if ( 700 ix is not None 701 and 702 isinstance(df.loc[ix][col], Decimal) 703 ) 704 ] 705 706 707def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 708 """ 709 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 710 711 Parameters 712 ---------- 713 df: pd.DataFrame 714 The DataFrame which may contain UUID objects. 715 716 Returns 717 ------- 718 A list of columns to treat as UUIDs. 719 """ 720 if df is None: 721 return [] 722 from uuid import UUID 723 is_dask = 'dask' in df.__module__ 724 if is_dask: 725 df = get_first_valid_dask_partition(df) 726 727 if len(df) == 0: 728 return [] 729 730 cols_indices = { 731 col: df[col].first_valid_index() 732 for col in df.columns 733 } 734 return [ 735 col 736 for col, ix in cols_indices.items() 737 if ( 738 ix is not None 739 and 740 isinstance(df.loc[ix][col], UUID) 741 ) 742 ] 743 744 745def get_datetime_cols( 746 df: 'pd.DataFrame', 747 timezone_aware: bool = True, 748 timezone_naive: bool = True, 749) -> List[str]: 750 """ 751 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 752 753 Parameters 754 ---------- 755 df: pd.DataFrame 756 The DataFrame which may contain `datetime` or `Timestamp` objects. 757 758 timezone_aware: bool, default True 759 If `True`, include timezone-aware datetime columns. 760 761 timezone_naive: bool, default True 762 If `True`, include timezone-naive datetime columns. 763 764 Returns 765 ------- 766 A list of columns to treat as datetimes. 767 """ 768 if not timezone_aware and not timezone_naive: 769 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 770 771 if df is None: 772 return [] 773 774 from datetime import datetime 775 from meerschaum.utils.dtypes import are_dtypes_equal 776 is_dask = 'dask' in df.__module__ 777 if is_dask: 778 df = get_first_valid_dask_partition(df) 779 780 known_dt_cols = [ 781 col 782 for col, typ in df.dtypes.items() 783 if are_dtypes_equal('datetime', str(typ)) 784 ] 785 786 if len(df) == 0: 787 return known_dt_cols 788 789 cols_indices = { 790 col: df[col].first_valid_index() 791 for col in df.columns 792 if col not in known_dt_cols 793 } 794 pydt_cols = [ 795 col 796 for col, ix in cols_indices.items() 797 if ( 798 ix is not None 799 and 800 isinstance(df.loc[ix][col], datetime) 801 ) 802 ] 803 dt_cols_set = set(known_dt_cols + pydt_cols) 804 all_dt_cols = [ 805 col 806 for col in df.columns 807 if col in dt_cols_set 808 ] 809 if timezone_aware and timezone_naive: 810 return all_dt_cols 811 812 known_timezone_aware_dt_cols = [ 813 col 814 for col in known_dt_cols 815 if getattr(df[col], 'tz', None) is not None 816 ] 817 timezone_aware_pydt_cols = [ 818 col 819 for col in pydt_cols 820 if df.loc[cols_indices[col]][col].tzinfo is not None 821 ] 822 timezone_aware_dt_cols_set = set(known_timezone_aware_dt_cols + timezone_aware_pydt_cols) 823 if timezone_aware: 824 return [ 825 col 826 for col in all_dt_cols 827 if col in timezone_aware_pydt_cols 828 ] 829 830 return [ 831 col 832 for col in all_dt_cols 833 if col not in timezone_aware_dt_cols_set 834 ] 835 836 837def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 838 """ 839 Get the columns which contain bytes strings from a Pandas DataFrame. 840 841 Parameters 842 ---------- 843 df: pd.DataFrame 844 The DataFrame which may contain bytes strings. 845 846 Returns 847 ------- 848 A list of columns to treat as bytes. 849 """ 850 if df is None: 851 return [] 852 is_dask = 'dask' in df.__module__ 853 if is_dask: 854 df = get_first_valid_dask_partition(df) 855 856 if len(df) == 0: 857 return [] 858 859 cols_indices = { 860 col: df[col].first_valid_index() 861 for col in df.columns 862 } 863 return [ 864 col 865 for col, ix in cols_indices.items() 866 if ( 867 ix is not None 868 and 869 isinstance(df.loc[ix][col], bytes) 870 ) 871 ] 872 873 874def get_geometry_cols( 875 df: 'pd.DataFrame', 876 with_types_srids: bool = False, 877) -> Union[List[str], Dict[str, Any]]: 878 """ 879 Get the columns which contain shapely objects from a Pandas DataFrame. 880 881 Parameters 882 ---------- 883 df: pd.DataFrame 884 The DataFrame which may contain bytes strings. 885 886 with_types_srids: bool, default False 887 If `True`, return a dictionary mapping columns to geometry types and SRIDs. 888 889 Returns 890 ------- 891 A list of columns to treat as `geometry`. 892 If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID). 893 """ 894 if df is None: 895 return [] 896 897 is_dask = 'dask' in df.__module__ 898 if is_dask: 899 df = get_first_valid_dask_partition(df) 900 901 if len(df) == 0: 902 return [] 903 904 cols_indices = { 905 col: df[col].first_valid_index() 906 for col in df.columns 907 } 908 geo_cols = [ 909 col 910 for col, ix in cols_indices.items() 911 if ( 912 ix is not None 913 and 914 'shapely' in str(type(df.loc[ix][col])) 915 ) 916 ] 917 if not with_types_srids: 918 return geo_cols 919 920 gpd = mrsm.attempt_import('geopandas', lazy=False) 921 geo_cols_types_srids = {} 922 for col in geo_cols: 923 try: 924 sample_geo_series = gpd.GeoSeries(df[col], crs=None) 925 geometry_types = { 926 geom.geom_type 927 for geom in sample_geo_series 928 if hasattr(geom, 'geom_type') 929 } 930 geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series) 931 srid = ( 932 ( 933 sample_geo_series.crs.sub_crs_list[0].to_epsg() 934 if sample_geo_series.crs.is_compound 935 else sample_geo_series.crs.to_epsg() 936 ) 937 if sample_geo_series.crs 938 else 0 939 ) 940 geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry' 941 if geometry_type != 'geometry' and geometry_has_z: 942 geometry_type = geometry_type + 'Z' 943 except Exception: 944 srid = 0 945 geometry_type = 'geometry' 946 geo_cols_types_srids[col] = (geometry_type, srid) 947 948 return geo_cols_types_srids 949 950 951def enforce_dtypes( 952 df: 'pd.DataFrame', 953 dtypes: Dict[str, str], 954 safe_copy: bool = True, 955 coerce_numeric: bool = True, 956 coerce_timezone: bool = True, 957 strip_timezone: bool = False, 958 debug: bool = False, 959) -> 'pd.DataFrame': 960 """ 961 Enforce the `dtypes` dictionary on a DataFrame. 962 963 Parameters 964 ---------- 965 df: pd.DataFrame 966 The DataFrame on which to enforce dtypes. 967 968 dtypes: Dict[str, str] 969 The data types to attempt to enforce on the DataFrame. 970 971 safe_copy: bool, default True 972 If `True`, create a copy before comparing and modifying the dataframes. 973 Setting to `False` may mutate the DataFrames. 974 See `meerschaum.utils.dataframe.filter_unseen_df`. 975 976 coerce_numeric: bool, default True 977 If `True`, convert float and int collisions to numeric. 978 979 coerce_timezone: bool, default True 980 If `True`, convert datetimes to UTC. 981 982 strip_timezone: bool, default False 983 If `coerce_timezone` and `strip_timezone` are `True`, 984 remove timezone information from datetimes. 985 986 debug: bool, default False 987 Verbosity toggle. 988 989 Returns 990 ------- 991 The Pandas DataFrame with the types enforced. 992 """ 993 import json 994 import functools 995 from meerschaum.utils.debug import dprint 996 from meerschaum.utils.formatting import pprint 997 from meerschaum.utils.dtypes import ( 998 are_dtypes_equal, 999 to_pandas_dtype, 1000 is_dtype_numeric, 1001 attempt_cast_to_numeric, 1002 attempt_cast_to_uuid, 1003 attempt_cast_to_bytes, 1004 attempt_cast_to_geometry, 1005 coerce_timezone as _coerce_timezone, 1006 ) 1007 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 1008 pandas = mrsm.attempt_import('pandas') 1009 is_dask = 'dask' in df.__module__ 1010 if safe_copy: 1011 df = df.copy() 1012 if len(df.columns) == 0: 1013 if debug: 1014 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 1015 return df 1016 1017 pipe_pandas_dtypes = { 1018 col: to_pandas_dtype(typ) 1019 for col, typ in dtypes.items() 1020 } 1021 json_cols = [ 1022 col 1023 for col, typ in dtypes.items() 1024 if typ == 'json' 1025 ] 1026 numeric_cols = [ 1027 col 1028 for col, typ in dtypes.items() 1029 if typ.startswith('numeric') 1030 ] 1031 geometry_cols = [ 1032 col 1033 for col, typ in dtypes.items() 1034 if typ.startswith('geometry') or typ.startswith('geography') 1035 ] 1036 uuid_cols = [ 1037 col 1038 for col, typ in dtypes.items() 1039 if typ == 'uuid' 1040 ] 1041 bytes_cols = [ 1042 col 1043 for col, typ in dtypes.items() 1044 if typ == 'bytes' 1045 ] 1046 datetime_cols = [ 1047 col 1048 for col, typ in dtypes.items() 1049 if are_dtypes_equal(typ, 'datetime') 1050 ] 1051 df_numeric_cols = get_numeric_cols(df) 1052 if debug: 1053 dprint("Desired data types:") 1054 pprint(dtypes) 1055 dprint("Data types for incoming DataFrame:") 1056 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 1057 1058 if json_cols and len(df) > 0: 1059 if debug: 1060 dprint(f"Checking columns for JSON encoding: {json_cols}") 1061 for col in json_cols: 1062 if col in df.columns: 1063 try: 1064 df[col] = df[col].apply( 1065 ( 1066 lambda x: ( 1067 json.loads(x) 1068 if isinstance(x, str) 1069 else x 1070 ) 1071 ) 1072 ) 1073 except Exception as e: 1074 if debug: 1075 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 1076 1077 if numeric_cols: 1078 if debug: 1079 dprint(f"Checking for numerics: {numeric_cols}") 1080 for col in numeric_cols: 1081 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 1082 if col in df.columns: 1083 try: 1084 df[col] = df[col].apply( 1085 functools.partial( 1086 attempt_cast_to_numeric, 1087 quantize=True, 1088 precision=precision, 1089 scale=scale, 1090 ) 1091 ) 1092 except Exception as e: 1093 if debug: 1094 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 1095 1096 if uuid_cols: 1097 if debug: 1098 dprint(f"Checking for UUIDs: {uuid_cols}") 1099 for col in uuid_cols: 1100 if col in df.columns: 1101 try: 1102 df[col] = df[col].apply(attempt_cast_to_uuid) 1103 except Exception as e: 1104 if debug: 1105 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1106 1107 if bytes_cols: 1108 if debug: 1109 dprint(f"Checking for bytes: {bytes_cols}") 1110 for col in bytes_cols: 1111 if col in df.columns: 1112 try: 1113 df[col] = df[col].apply(attempt_cast_to_bytes) 1114 except Exception as e: 1115 if debug: 1116 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1117 1118 if datetime_cols and coerce_timezone: 1119 if debug: 1120 dprint(f"Checking for datetime conversion: {datetime_cols}") 1121 for col in datetime_cols: 1122 if col in df.columns: 1123 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1124 1125 if geometry_cols: 1126 geopandas = mrsm.attempt_import('geopandas') 1127 if debug: 1128 dprint(f"Checking for geometry: {geometry_cols}") 1129 parsed_geom_cols = [] 1130 for col in geometry_cols: 1131 try: 1132 df[col] = df[col].apply(attempt_cast_to_geometry) 1133 parsed_geom_cols.append(col) 1134 except Exception as e: 1135 if debug: 1136 dprint(f"Unable to parse column '{col}' as geometry:\n{e}") 1137 1138 if parsed_geom_cols: 1139 if debug: 1140 dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...") 1141 df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0]) 1142 try: 1143 df.rename_geometry(parsed_geom_cols[0], inplace=True) 1144 except ValueError: 1145 pass 1146 1147 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1148 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1149 if debug: 1150 dprint("Data types match. Exiting enforcement...") 1151 return df 1152 1153 common_dtypes = {} 1154 common_diff_dtypes = {} 1155 for col, typ in pipe_pandas_dtypes.items(): 1156 if col in df_dtypes: 1157 common_dtypes[col] = typ 1158 if not are_dtypes_equal(typ, df_dtypes[col]): 1159 common_diff_dtypes[col] = df_dtypes[col] 1160 1161 if debug: 1162 dprint("Common columns with different dtypes:") 1163 pprint(common_diff_dtypes) 1164 1165 detected_dt_cols = {} 1166 for col, typ in common_diff_dtypes.items(): 1167 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1168 df_dtypes[col] = typ 1169 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1170 for col in detected_dt_cols: 1171 del common_diff_dtypes[col] 1172 1173 if debug: 1174 dprint("Common columns with different dtypes (after dates):") 1175 pprint(common_diff_dtypes) 1176 1177 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1178 if debug: 1179 dprint( 1180 "The incoming DataFrame has mostly the same types, skipping enforcement." 1181 + "The only detected difference was in the following datetime columns." 1182 ) 1183 pprint(detected_dt_cols) 1184 return df 1185 1186 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1187 previous_typ = common_dtypes[col] 1188 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1189 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 1190 explicitly_numeric = dtypes.get(col, 'numeric').startswith('numeric') 1191 cast_to_numeric = ( 1192 explicitly_numeric 1193 or col in df_numeric_cols 1194 or (mixed_numeric_types and not explicitly_float) 1195 ) and coerce_numeric 1196 if cast_to_numeric: 1197 common_dtypes[col] = attempt_cast_to_numeric 1198 common_diff_dtypes[col] = attempt_cast_to_numeric 1199 1200 for d in common_diff_dtypes: 1201 t = common_dtypes[d] 1202 if debug: 1203 dprint(f"Casting column {d} to dtype {t}.") 1204 try: 1205 df[d] = ( 1206 df[d].apply(t) 1207 if callable(t) 1208 else df[d].astype(t) 1209 ) 1210 except Exception as e: 1211 if debug: 1212 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 1213 if 'int' in str(t).lower(): 1214 try: 1215 df[d] = df[d].astype('float64').astype(t) 1216 except Exception: 1217 if debug: 1218 dprint(f"Was unable to convert to float then {t}.") 1219 return df 1220 1221 1222def get_datetime_bound_from_df( 1223 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1224 datetime_column: str, 1225 minimum: bool = True, 1226) -> Union[int, datetime, None]: 1227 """ 1228 Return the minimum or maximum datetime (or integer) from a DataFrame. 1229 1230 Parameters 1231 ---------- 1232 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1233 The DataFrame, list, or dict which contains the range axis. 1234 1235 datetime_column: str 1236 The name of the datetime (or int) column. 1237 1238 minimum: bool 1239 Whether to return the minimum (default) or maximum value. 1240 1241 Returns 1242 ------- 1243 The minimum or maximum datetime value in the dataframe, or `None`. 1244 """ 1245 from meerschaum.utils.dtypes import to_datetime, value_is_null 1246 1247 if df is None: 1248 return None 1249 if not datetime_column: 1250 return None 1251 1252 def compare(a, b): 1253 if a is None: 1254 return b 1255 if b is None: 1256 return a 1257 if minimum: 1258 return a if a < b else b 1259 return a if a > b else b 1260 1261 if isinstance(df, list): 1262 if len(df) == 0: 1263 return None 1264 best_yet = df[0].get(datetime_column, None) 1265 for doc in df: 1266 val = doc.get(datetime_column, None) 1267 best_yet = compare(best_yet, val) 1268 return best_yet 1269 1270 if isinstance(df, dict): 1271 if datetime_column not in df: 1272 return None 1273 best_yet = df[datetime_column][0] 1274 for val in df[datetime_column]: 1275 best_yet = compare(best_yet, val) 1276 return best_yet 1277 1278 if 'DataFrame' in str(type(df)): 1279 from meerschaum.utils.dtypes import are_dtypes_equal 1280 pandas = mrsm.attempt_import('pandas') 1281 is_dask = 'dask' in df.__module__ 1282 1283 if datetime_column not in df.columns: 1284 return None 1285 1286 try: 1287 dt_val = ( 1288 df[datetime_column].min(skipna=True) 1289 if minimum 1290 else df[datetime_column].max(skipna=True) 1291 ) 1292 except Exception: 1293 dt_val = pandas.NA 1294 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1295 dt_val = dt_val.compute() 1296 1297 return ( 1298 to_datetime(dt_val, as_pydatetime=True) 1299 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1300 else (dt_val if not value_is_null(dt_val) else None) 1301 ) 1302 1303 return None 1304 1305 1306def get_unique_index_values( 1307 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1308 indices: List[str], 1309) -> Dict[str, List[Any]]: 1310 """ 1311 Return a dictionary of the unique index values in a DataFrame. 1312 1313 Parameters 1314 ---------- 1315 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1316 The dataframe (or list or dict) which contains index values. 1317 1318 indices: List[str] 1319 The list of index columns. 1320 1321 Returns 1322 ------- 1323 A dictionary mapping indices to unique values. 1324 """ 1325 if df is None: 1326 return {} 1327 if 'dataframe' in str(type(df)).lower(): 1328 pandas = mrsm.attempt_import('pandas') 1329 return { 1330 col: list({ 1331 (val if val is not pandas.NA else None) 1332 for val in df[col].unique() 1333 }) 1334 for col in indices 1335 if col in df.columns 1336 } 1337 1338 unique_indices = defaultdict(lambda: set()) 1339 if isinstance(df, list): 1340 for doc in df: 1341 for index in indices: 1342 if index in doc: 1343 unique_indices[index].add(doc[index]) 1344 1345 elif isinstance(df, dict): 1346 for index in indices: 1347 if index in df: 1348 unique_indices[index] = unique_indices[index].union(set(df[index])) 1349 1350 return {key: list(val) for key, val in unique_indices.items()} 1351 1352 1353def df_is_chunk_generator(df: Any) -> bool: 1354 """ 1355 Determine whether to treat `df` as a chunk generator. 1356 1357 Note this should only be used in a context where generators are expected, 1358 as it will return `True` for any iterable. 1359 1360 Parameters 1361 ---------- 1362 The DataFrame or chunk generator to evaluate. 1363 1364 Returns 1365 ------- 1366 A `bool` indicating whether to treat `df` as a generator. 1367 """ 1368 return ( 1369 not isinstance(df, (dict, list, str)) 1370 and 'DataFrame' not in str(type(df)) 1371 and isinstance(df, (Generator, Iterable, Iterator)) 1372 ) 1373 1374 1375def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1376 """ 1377 Return the Dask `npartitions` value for a given `chunksize`. 1378 """ 1379 if chunksize == -1: 1380 from meerschaum.config import get_config 1381 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1382 if chunksize is None: 1383 return 1 1384 return -1 * chunksize 1385 1386 1387def df_from_literal( 1388 pipe: Optional[mrsm.Pipe] = None, 1389 literal: str = None, 1390 debug: bool = False 1391) -> 'pd.DataFrame': 1392 """ 1393 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1394 1395 Parameters 1396 ---------- 1397 pipe: Optional['meerschaum.Pipe'], default None 1398 The pipe which will consume the literal value. 1399 1400 Returns 1401 ------- 1402 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1403 and the literal as the value. 1404 """ 1405 from meerschaum.utils.packages import import_pandas 1406 from meerschaum.utils.warnings import error, warn 1407 from meerschaum.utils.debug import dprint 1408 1409 if pipe is None or literal is None: 1410 error("Please provide a Pipe and a literal value") 1411 ### this will raise an error if the columns are undefined 1412 dt_name, val_name = pipe.get_columns('datetime', 'value') 1413 1414 val = literal 1415 if isinstance(literal, str): 1416 if debug: 1417 dprint(f"Received literal string: '{literal}'") 1418 import ast 1419 try: 1420 val = ast.literal_eval(literal) 1421 except Exception: 1422 warn( 1423 "Failed to parse value from string:\n" + f"{literal}" + 1424 "\n\nWill cast as a string instead."\ 1425 ) 1426 val = literal 1427 1428 from datetime import datetime, timezone 1429 now = datetime.now(timezone.utc).replace(tzinfo=None) 1430 1431 pd = import_pandas() 1432 return pd.DataFrame({dt_name: [now], val_name: [val]}) 1433 1434 1435def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1436 """ 1437 Return the first valid Dask DataFrame partition (if possible). 1438 """ 1439 pdf = None 1440 for partition in ddf.partitions: 1441 try: 1442 pdf = partition.compute() 1443 except Exception: 1444 continue 1445 if len(pdf) > 0: 1446 return pdf 1447 _ = mrsm.attempt_import('partd', lazy=False) 1448 return ddf.compute() 1449 1450 1451def query_df( 1452 df: 'pd.DataFrame', 1453 params: Optional[Dict[str, Any]] = None, 1454 begin: Union[datetime, int, None] = None, 1455 end: Union[datetime, int, None] = None, 1456 datetime_column: Optional[str] = None, 1457 select_columns: Optional[List[str]] = None, 1458 omit_columns: Optional[List[str]] = None, 1459 inplace: bool = False, 1460 reset_index: bool = False, 1461 coerce_types: bool = False, 1462 debug: bool = False, 1463) -> 'pd.DataFrame': 1464 """ 1465 Query the dataframe with the params dictionary. 1466 1467 Parameters 1468 ---------- 1469 df: pd.DataFrame 1470 The DataFrame to query against. 1471 1472 params: Optional[Dict[str, Any]], default None 1473 The parameters dictionary to use for the query. 1474 1475 begin: Union[datetime, int, None], default None 1476 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1477 greater than or equal to this value. 1478 1479 end: Union[datetime, int, None], default None 1480 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1481 less than this value. 1482 1483 datetime_column: Optional[str], default None 1484 A `datetime_column` must be provided to use `begin` and `end`. 1485 1486 select_columns: Optional[List[str]], default None 1487 If provided, only return these columns. 1488 1489 omit_columns: Optional[List[str]], default None 1490 If provided, do not include these columns in the result. 1491 1492 inplace: bool, default False 1493 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1494 1495 reset_index: bool, default False 1496 If `True`, reset the index in the resulting DataFrame. 1497 1498 coerce_types: bool, default False 1499 If `True`, cast the dataframe and parameters as strings before querying. 1500 1501 Returns 1502 ------- 1503 A Pandas DataFrame query result. 1504 """ 1505 1506 def _process_select_columns(_df): 1507 if not select_columns: 1508 return 1509 for col in list(_df.columns): 1510 if col not in select_columns: 1511 del _df[col] 1512 1513 def _process_omit_columns(_df): 1514 if not omit_columns: 1515 return 1516 for col in list(_df.columns): 1517 if col in omit_columns: 1518 del _df[col] 1519 1520 if not params and not begin and not end: 1521 if not inplace: 1522 df = df.copy() 1523 _process_select_columns(df) 1524 _process_omit_columns(df) 1525 return df 1526 1527 from meerschaum.utils.debug import dprint 1528 from meerschaum.utils.misc import get_in_ex_params 1529 from meerschaum.utils.warnings import warn 1530 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1531 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1532 pandas = mrsm.attempt_import('pandas') 1533 NA = pandas.NA 1534 1535 if params: 1536 params = params.copy() 1537 for key, val in {k: v for k, v in params.items()}.items(): 1538 if isinstance(val, (list, tuple)): 1539 if None in val: 1540 val = [item for item in val if item is not None] + [NA] 1541 params[key] = val 1542 if coerce_types: 1543 params[key] = [str(x) for x in val] 1544 else: 1545 if value_is_null(val): 1546 val = NA 1547 params[key] = NA 1548 if coerce_types: 1549 params[key] = str(val) 1550 1551 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1552 1553 if inplace: 1554 df.fillna(NA, inplace=True) 1555 else: 1556 df = df.infer_objects().fillna(NA) 1557 1558 if isinstance(begin, str): 1559 begin = dateutil_parser.parse(begin) 1560 if isinstance(end, str): 1561 end = dateutil_parser.parse(end) 1562 1563 if begin is not None or end is not None: 1564 if not datetime_column or datetime_column not in df.columns: 1565 warn( 1566 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1567 + "ignoring begin and end...", 1568 ) 1569 begin, end = None, None 1570 1571 if debug: 1572 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1573 1574 if datetime_column and (begin is not None or end is not None): 1575 if debug: 1576 dprint("Checking for datetime column compatability.") 1577 1578 from meerschaum.utils.dtypes import coerce_timezone 1579 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1580 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1581 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1582 1583 if df_is_dt: 1584 df_tz = ( 1585 getattr(df[datetime_column].dt, 'tz', None) 1586 if hasattr(df[datetime_column], 'dt') 1587 else None 1588 ) 1589 1590 if begin_is_int: 1591 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1592 if debug: 1593 dprint(f"`begin` will be cast to '{begin}'.") 1594 if end_is_int: 1595 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1596 if debug: 1597 dprint(f"`end` will be cast to '{end}'.") 1598 1599 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1600 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1601 1602 in_ex_params = get_in_ex_params(params) 1603 1604 masks = [ 1605 ( 1606 (df[datetime_column] >= begin) 1607 if begin is not None and datetime_column 1608 else True 1609 ) & ( 1610 (df[datetime_column] < end) 1611 if end is not None and datetime_column 1612 else True 1613 ) 1614 ] 1615 1616 masks.extend([ 1617 ( 1618 ( 1619 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1620 if in_vals 1621 else True 1622 ) & ( 1623 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1624 if ex_vals 1625 else True 1626 ) 1627 ) 1628 for col, (in_vals, ex_vals) in in_ex_params.items() 1629 if col in df.columns 1630 ]) 1631 query_mask = masks[0] 1632 for mask in masks[1:]: 1633 query_mask = query_mask & mask 1634 1635 original_cols = df.columns 1636 1637 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1638 ### to allow for `<NA>` values. 1639 bool_cols = [ 1640 col 1641 for col, typ in df.dtypes.items() 1642 if are_dtypes_equal(str(typ), 'bool') 1643 ] 1644 for col in bool_cols: 1645 df[col] = df[col].astype('boolean[pyarrow]') 1646 1647 if not isinstance(query_mask, bool): 1648 df['__mrsm_mask'] = ( 1649 query_mask.astype('boolean[pyarrow]') 1650 if hasattr(query_mask, 'astype') 1651 else query_mask 1652 ) 1653 1654 if inplace: 1655 df.where(query_mask, other=NA, inplace=True) 1656 df.dropna(how='all', inplace=True) 1657 result_df = df 1658 else: 1659 result_df = df.where(query_mask, other=NA) 1660 result_df.dropna(how='all', inplace=True) 1661 1662 else: 1663 result_df = df 1664 1665 if '__mrsm_mask' in df.columns: 1666 del df['__mrsm_mask'] 1667 if '__mrsm_mask' in result_df.columns: 1668 del result_df['__mrsm_mask'] 1669 1670 if reset_index: 1671 result_df.reset_index(drop=True, inplace=True) 1672 1673 result_df = enforce_dtypes( 1674 result_df, 1675 dtypes, 1676 safe_copy=False, 1677 debug=debug, 1678 coerce_numeric=False, 1679 coerce_timezone=False, 1680 ) 1681 1682 if select_columns == ['*']: 1683 select_columns = None 1684 1685 if not select_columns and not omit_columns: 1686 return result_df[original_cols] 1687 1688 _process_select_columns(result_df) 1689 _process_omit_columns(result_df) 1690 1691 return result_df 1692 1693 1694def to_json( 1695 df: 'pd.DataFrame', 1696 safe_copy: bool = True, 1697 orient: str = 'records', 1698 date_format: str = 'iso', 1699 date_unit: str = 'us', 1700 double_precision: int = 15, 1701 geometry_format: str = 'geojson', 1702 **kwargs: Any 1703) -> str: 1704 """ 1705 Serialize the given dataframe as a JSON string. 1706 1707 Parameters 1708 ---------- 1709 df: pd.DataFrame 1710 The DataFrame to be serialized. 1711 1712 safe_copy: bool, default True 1713 If `False`, modify the DataFrame inplace. 1714 1715 date_format: str, default 'iso' 1716 The default format for timestamps. 1717 1718 date_unit: str, default 'us' 1719 The precision of the timestamps. 1720 1721 double_precision: int, default 15 1722 The number of decimal places to use when encoding floating point values (maximum 15). 1723 1724 geometry_format: str, default 'geojson' 1725 The serialization format for geometry data. 1726 Accepted values are `geojson`, `wkb_hex`, and `wkt`. 1727 1728 Returns 1729 ------- 1730 A JSON string. 1731 """ 1732 import warnings 1733 import functools 1734 from meerschaum.utils.packages import import_pandas 1735 from meerschaum.utils.dtypes import ( 1736 serialize_bytes, 1737 serialize_decimal, 1738 serialize_geometry, 1739 ) 1740 pd = import_pandas() 1741 uuid_cols = get_uuid_cols(df) 1742 bytes_cols = get_bytes_cols(df) 1743 numeric_cols = get_numeric_cols(df) 1744 geometry_cols = get_geometry_cols(df) 1745 if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols): 1746 df = df.copy() 1747 for col in uuid_cols: 1748 df[col] = df[col].astype(str) 1749 for col in bytes_cols: 1750 df[col] = df[col].apply(serialize_bytes) 1751 for col in numeric_cols: 1752 df[col] = df[col].apply(serialize_decimal) 1753 with warnings.catch_warnings(): 1754 warnings.simplefilter("ignore") 1755 for col in geometry_cols: 1756 df[col] = df[col].apply( 1757 functools.partial( 1758 serialize_geometry, 1759 geometry_format=geometry_format, 1760 ) 1761 ) 1762 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 1763 date_format=date_format, 1764 date_unit=date_unit, 1765 double_precision=double_precision, 1766 orient=orient, 1767 **kwargs 1768 )
26def add_missing_cols_to_df( 27 df: 'pd.DataFrame', 28 dtypes: Dict[str, Any], 29) -> 'pd.DataFrame': 30 """ 31 Add columns from the dtypes dictionary as null columns to a new DataFrame. 32 33 Parameters 34 ---------- 35 df: pd.DataFrame 36 The dataframe we should copy and add null columns. 37 38 dtypes: 39 The data types dictionary which may contain keys not present in `df.columns`. 40 41 Returns 42 ------- 43 A new `DataFrame` with the keys from `dtypes` added as null columns. 44 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 45 NOTE: This will not ensure that dtypes are enforced! 46 47 Examples 48 -------- 49 >>> import pandas as pd 50 >>> df = pd.DataFrame([{'a': 1}]) 51 >>> dtypes = {'b': 'Int64'} 52 >>> add_missing_cols_to_df(df, dtypes) 53 a b 54 0 1 <NA> 55 >>> add_missing_cols_to_df(df, dtypes).dtypes 56 a int64 57 b Int64 58 dtype: object 59 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 60 a int64 61 dtype: object 62 >>> 63 """ 64 if set(df.columns) == set(dtypes): 65 return df 66 67 from meerschaum.utils.packages import attempt_import 68 from meerschaum.utils.dtypes import to_pandas_dtype 69 pandas = attempt_import('pandas') 70 71 def build_series(dtype: str): 72 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 73 74 assign_kwargs = { 75 str(col): build_series(str(typ)) 76 for col, typ in dtypes.items() 77 if col not in df.columns 78 } 79 df_with_cols = df.assign(**assign_kwargs) 80 for col in assign_kwargs: 81 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 82 return df_with_cols
Add columns from the dtypes dictionary as null columns to a new DataFrame.
Parameters
- df (pd.DataFrame): The dataframe we should copy and add null columns.
- dtypes:: The data types dictionary which may contain keys not present in
df.columns
.
Returns
- A new
DataFrame
with the keys fromdtypes
added as null columns. - If
df.dtypes
is the same asdtypes
, then return a reference todf
. - NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
a b
0 1 <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a int64
b Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a int64
dtype: object
>>>
85def filter_unseen_df( 86 old_df: 'pd.DataFrame', 87 new_df: 'pd.DataFrame', 88 safe_copy: bool = True, 89 dtypes: Optional[Dict[str, Any]] = None, 90 include_unchanged_columns: bool = False, 91 coerce_mixed_numerics: bool = True, 92 debug: bool = False, 93) -> 'pd.DataFrame': 94 """ 95 Left join two DataFrames to find the newest unseen data. 96 97 Parameters 98 ---------- 99 old_df: 'pd.DataFrame' 100 The original (target) dataframe. Acts as a filter on the `new_df`. 101 102 new_df: 'pd.DataFrame' 103 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 104 105 safe_copy: bool, default True 106 If `True`, create a copy before comparing and modifying the dataframes. 107 Setting to `False` may mutate the DataFrames. 108 109 dtypes: Optional[Dict[str, Any]], default None 110 Optionally specify the datatypes of the dataframe. 111 112 include_unchanged_columns: bool, default False 113 If `True`, include columns which haven't changed on rows which have changed. 114 115 coerce_mixed_numerics: bool, default True 116 If `True`, cast mixed integer and float columns between the old and new dataframes into 117 numeric values (`decimal.Decimal`). 118 119 debug: bool, default False 120 Verbosity toggle. 121 122 Returns 123 ------- 124 A pandas dataframe of the new, unseen rows in `new_df`. 125 126 Examples 127 -------- 128 ```python 129 >>> import pandas as pd 130 >>> df1 = pd.DataFrame({'a': [1,2]}) 131 >>> df2 = pd.DataFrame({'a': [2,3]}) 132 >>> filter_unseen_df(df1, df2) 133 a 134 0 3 135 136 ``` 137 138 """ 139 if old_df is None: 140 return new_df 141 142 if safe_copy: 143 old_df = old_df.copy() 144 new_df = new_df.copy() 145 146 import json 147 import functools 148 import traceback 149 from meerschaum.utils.warnings import warn 150 from meerschaum.utils.packages import import_pandas, attempt_import 151 from meerschaum.utils.dtypes import ( 152 to_pandas_dtype, 153 are_dtypes_equal, 154 attempt_cast_to_numeric, 155 attempt_cast_to_uuid, 156 attempt_cast_to_bytes, 157 attempt_cast_to_geometry, 158 coerce_timezone, 159 serialize_decimal, 160 ) 161 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 162 pd = import_pandas(debug=debug) 163 is_dask = 'dask' in new_df.__module__ 164 if is_dask: 165 pandas = attempt_import('pandas') 166 _ = attempt_import('partd', lazy=False) 167 dd = attempt_import('dask.dataframe') 168 merge = dd.merge 169 NA = pandas.NA 170 else: 171 merge = pd.merge 172 NA = pd.NA 173 174 new_df_dtypes = dict(new_df.dtypes) 175 old_df_dtypes = dict(old_df.dtypes) 176 177 same_cols = set(new_df.columns) == set(old_df.columns) 178 if not same_cols: 179 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 180 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 181 182 new_types_missing_from_old = { 183 col: typ 184 for col, typ in new_df_dtypes.items() 185 if col not in old_df_dtypes 186 } 187 old_types_missing_from_new = { 188 col: typ 189 for col, typ in new_df_dtypes.items() 190 if col not in old_df_dtypes 191 } 192 old_df_dtypes.update(new_types_missing_from_old) 193 new_df_dtypes.update(old_types_missing_from_new) 194 195 ### Edge case: two empty lists cast to DFs. 196 elif len(new_df.columns) == 0: 197 return new_df 198 199 try: 200 ### Order matters when checking equality. 201 new_df = new_df[old_df.columns] 202 203 except Exception as e: 204 warn( 205 "Was not able to cast old columns onto new DataFrame. " + 206 f"Are both DataFrames the same shape? Error:\n{e}", 207 stacklevel=3, 208 ) 209 return new_df[list(new_df_dtypes.keys())] 210 211 ### assume the old_df knows what it's doing, even if it's technically wrong. 212 if dtypes is None: 213 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 214 215 dtypes = { 216 col: to_pandas_dtype(typ) 217 for col, typ in dtypes.items() 218 if col in new_df_dtypes and col in old_df_dtypes 219 } 220 for col, typ in new_df_dtypes.items(): 221 if col not in dtypes: 222 dtypes[col] = typ 223 224 numeric_cols_precisions_scales = { 225 col: get_numeric_precision_scale(None, typ) 226 for col, typ in dtypes.items() 227 if col and str(typ).lower().startswith('numeric') 228 } 229 230 dt_dtypes = { 231 col: typ 232 for col, typ in dtypes.items() 233 if are_dtypes_equal(typ, 'datetime') 234 } 235 non_dt_dtypes = { 236 col: typ 237 for col, typ in dtypes.items() 238 if col not in dt_dtypes 239 } 240 241 cast_non_dt_cols = True 242 try: 243 new_df = new_df.astype(non_dt_dtypes) 244 cast_non_dt_cols = False 245 except Exception as e: 246 warn( 247 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 248 ) 249 250 cast_dt_cols = True 251 try: 252 for col, typ in dt_dtypes.items(): 253 strip_utc = ( 254 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 255 ) 256 if col in old_df.columns: 257 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 258 if col in new_df.columns: 259 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 260 cast_dt_cols = False 261 except Exception as e: 262 warn(f"Could not cast datetime columns:\n{e}") 263 264 cast_cols = cast_dt_cols or cast_non_dt_cols 265 266 new_numeric_cols_existing = get_numeric_cols(new_df) 267 old_numeric_cols = get_numeric_cols(old_df) 268 for col, typ in {k: v for k, v in dtypes.items()}.items(): 269 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 270 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 271 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 272 new_is_numeric = col in new_numeric_cols_existing 273 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 274 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 275 old_is_numeric = col in old_numeric_cols 276 277 if ( 278 coerce_mixed_numerics 279 and 280 (new_is_float or new_is_int or new_is_numeric) 281 and 282 (old_is_float or old_is_int or old_is_numeric) 283 ): 284 dtypes[col] = attempt_cast_to_numeric 285 cast_cols = True 286 continue 287 288 ### Fallback to object if the types don't match. 289 warn( 290 f"Detected different types for '{col}' " 291 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 292 + "falling back to 'object'..." 293 ) 294 dtypes[col] = 'object' 295 cast_cols = True 296 297 if cast_cols: 298 for col, dtype in dtypes.items(): 299 if col in new_df.columns: 300 try: 301 new_df[col] = ( 302 new_df[col].astype(dtype) 303 if not callable(dtype) 304 else new_df[col].apply(dtype) 305 ) 306 except Exception as e: 307 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 308 309 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 310 new_json_cols = get_json_cols(new_df) 311 old_json_cols = get_json_cols(old_df) 312 json_cols = set(new_json_cols + old_json_cols) 313 for json_col in old_json_cols: 314 old_df[json_col] = old_df[json_col].apply(serializer) 315 for json_col in new_json_cols: 316 new_df[json_col] = new_df[json_col].apply(serializer) 317 318 new_numeric_cols = get_numeric_cols(new_df) 319 numeric_cols = set(new_numeric_cols + old_numeric_cols) 320 for numeric_col in old_numeric_cols: 321 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 322 for numeric_col in new_numeric_cols: 323 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 324 325 old_dt_cols = [ 326 col 327 for col, typ in old_df.dtypes.items() 328 if are_dtypes_equal(str(typ), 'datetime') 329 ] 330 for col in old_dt_cols: 331 strip_utc = ( 332 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 333 ) 334 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 335 336 new_dt_cols = [ 337 col 338 for col, typ in new_df.dtypes.items() 339 if are_dtypes_equal(str(typ), 'datetime') 340 ] 341 for col in new_dt_cols: 342 strip_utc = ( 343 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 344 ) 345 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 346 347 old_uuid_cols = get_uuid_cols(old_df) 348 new_uuid_cols = get_uuid_cols(new_df) 349 uuid_cols = set(new_uuid_cols + old_uuid_cols) 350 351 old_bytes_cols = get_bytes_cols(old_df) 352 new_bytes_cols = get_bytes_cols(new_df) 353 bytes_cols = set(new_bytes_cols + old_bytes_cols) 354 355 old_geometry_cols = get_geometry_cols(old_df) 356 new_geometry_cols = get_geometry_cols(new_df) 357 geometry_cols = set(new_geometry_cols + old_geometry_cols) 358 359 joined_df = merge( 360 new_df.infer_objects(copy=False).fillna(NA), 361 old_df.infer_objects(copy=False).fillna(NA), 362 how='left', 363 on=None, 364 indicator=True, 365 ) 366 changed_rows_mask = (joined_df['_merge'] == 'left_only') 367 new_cols = list(new_df_dtypes) 368 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 369 370 for json_col in json_cols: 371 if json_col not in delta_df.columns: 372 continue 373 try: 374 delta_df[json_col] = delta_df[json_col].apply(json.loads) 375 except Exception: 376 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 377 378 for numeric_col in numeric_cols: 379 if numeric_col not in delta_df.columns: 380 continue 381 try: 382 delta_df[numeric_col] = delta_df[numeric_col].apply( 383 functools.partial( 384 attempt_cast_to_numeric, 385 quantize=True, 386 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 387 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 388 ) 389 ) 390 except Exception: 391 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 392 393 for uuid_col in uuid_cols: 394 if uuid_col not in delta_df.columns: 395 continue 396 try: 397 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 398 except Exception: 399 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 400 401 for bytes_col in bytes_cols: 402 if bytes_col not in delta_df.columns: 403 continue 404 try: 405 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 406 except Exception: 407 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 408 409 for geometry_col in geometry_cols: 410 if geometry_col not in delta_df.columns: 411 continue 412 try: 413 delta_df[geometry_col] = delta_df[geometry_col].apply(attempt_cast_to_geometry) 414 except Exception: 415 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 416 417 return delta_df
Left join two DataFrames to find the newest unseen data.
Parameters
- old_df ('pd.DataFrame'):
The original (target) dataframe. Acts as a filter on the
new_df
. - new_df ('pd.DataFrame'):
The fetched (source) dataframe. Rows that are contained in
old_df
are removed. - safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. - dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
- include_unchanged_columns (bool, default False):
If
True
, include columns which haven't changed on rows which have changed. - coerce_mixed_numerics (bool, default True):
If
True
, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal
). - debug (bool, default False): Verbosity toggle.
Returns
- A pandas dataframe of the new, unseen rows in
new_df
.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
a
0 3
420def parse_df_datetimes( 421 df: 'pd.DataFrame', 422 ignore_cols: Optional[Iterable[str]] = None, 423 strip_timezone: bool = False, 424 chunksize: Optional[int] = None, 425 dtype_backend: str = 'numpy_nullable', 426 ignore_all: bool = False, 427 debug: bool = False, 428) -> 'pd.DataFrame': 429 """ 430 Parse a pandas DataFrame for datetime columns and cast as datetimes. 431 432 Parameters 433 ---------- 434 df: pd.DataFrame 435 The pandas DataFrame to parse. 436 437 ignore_cols: Optional[Iterable[str]], default None 438 If provided, do not attempt to coerce these columns as datetimes. 439 440 strip_timezone: bool, default False 441 If `True`, remove the UTC `tzinfo` property. 442 443 chunksize: Optional[int], default None 444 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 445 446 dtype_backend: str, default 'numpy_nullable' 447 If `df` is not a DataFrame and new one needs to be constructed, 448 use this as the datatypes backend. 449 Accepted values are 'numpy_nullable' and 'pyarrow'. 450 451 ignore_all: bool, default False 452 If `True`, do not attempt to cast any columns to datetimes. 453 454 debug: bool, default False 455 Verbosity toggle. 456 457 Returns 458 ------- 459 A new pandas DataFrame with the determined datetime columns 460 (usually ISO strings) cast as datetimes. 461 462 Examples 463 -------- 464 ```python 465 >>> import pandas as pd 466 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 467 >>> df.dtypes 468 a object 469 dtype: object 470 >>> df = parse_df_datetimes(df) 471 >>> df.dtypes 472 a datetime64[ns] 473 dtype: object 474 475 ``` 476 477 """ 478 from meerschaum.utils.packages import import_pandas, attempt_import 479 from meerschaum.utils.debug import dprint 480 from meerschaum.utils.warnings import warn 481 from meerschaum.utils.misc import items_str 482 from meerschaum.utils.dtypes import to_datetime 483 import traceback 484 pd = import_pandas() 485 pandas = attempt_import('pandas') 486 pd_name = pd.__name__ 487 using_dask = 'dask' in pd_name 488 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 489 dask_dataframe = None 490 if using_dask or df_is_dask: 491 npartitions = chunksize_to_npartitions(chunksize) 492 dask_dataframe = attempt_import('dask.dataframe') 493 494 ### if df is a dict, build DataFrame 495 if isinstance(df, pandas.DataFrame): 496 pdf = df 497 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 498 pdf = get_first_valid_dask_partition(df) 499 else: 500 if debug: 501 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 502 503 if using_dask: 504 if isinstance(df, list): 505 keys = set() 506 for doc in df: 507 for key in doc: 508 keys.add(key) 509 df = pd.DataFrame.from_dict( 510 { 511 k: [ 512 doc.get(k, None) 513 for doc in df 514 ] for k in keys 515 }, 516 npartitions=npartitions, 517 ) 518 elif isinstance(df, dict): 519 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 520 elif 'pandas.core.frame.DataFrame' in str(type(df)): 521 df = pd.from_pandas(df, npartitions=npartitions) 522 else: 523 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 524 pandas = attempt_import('pandas') 525 pdf = get_first_valid_dask_partition(df) 526 527 else: 528 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 529 pdf = df 530 531 ### skip parsing if DataFrame is empty 532 if len(pdf) == 0: 533 if debug: 534 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 535 return df 536 537 ignore_cols = set( 538 (ignore_cols or []) + [ 539 col 540 for col, dtype in pdf.dtypes.items() 541 if 'datetime' in str(dtype) 542 ] 543 ) 544 cols_to_inspect = [ 545 col 546 for col in pdf.columns 547 if col not in ignore_cols 548 ] if not ignore_all else [] 549 550 if len(cols_to_inspect) == 0: 551 if debug: 552 dprint("All columns are ignored, skipping datetime detection...") 553 return df.infer_objects(copy=False).fillna(pandas.NA) 554 555 ### apply regex to columns to determine which are ISO datetimes 556 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 557 dt_mask = pdf[cols_to_inspect].astype(str).apply( 558 lambda s: s.str.match(iso_dt_regex).all() 559 ) 560 561 ### list of datetime column names 562 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 563 if not datetime_cols: 564 if debug: 565 dprint("No columns detected as datetimes, returning...") 566 return df.infer_objects(copy=False).fillna(pandas.NA) 567 568 if debug: 569 dprint("Converting columns to datetimes: " + str(datetime_cols)) 570 571 try: 572 if not using_dask: 573 df[datetime_cols] = df[datetime_cols].apply(to_datetime) 574 else: 575 df[datetime_cols] = df[datetime_cols].apply( 576 to_datetime, 577 utc=True, 578 axis=1, 579 meta={ 580 col: 'datetime64[ns, UTC]' 581 for col in datetime_cols 582 } 583 ) 584 except Exception: 585 warn( 586 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 587 + f"{traceback.format_exc()}" 588 ) 589 590 if strip_timezone: 591 for dt in datetime_cols: 592 try: 593 df[dt] = df[dt].dt.tz_localize(None) 594 except Exception: 595 warn( 596 f"Unable to convert column '{dt}' to naive datetime:\n" 597 + f"{traceback.format_exc()}" 598 ) 599 600 return df.fillna(pandas.NA)
Parse a pandas DataFrame for datetime columns and cast as datetimes.
Parameters
- df (pd.DataFrame): The pandas DataFrame to parse.
- ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
- strip_timezone (bool, default False):
If
True
, remove the UTCtzinfo
property. - chunksize (Optional[int], default None):
If the pandas implementation is
'dask'
, use this chunksize for the distributed dataframe. - dtype_backend (str, default 'numpy_nullable'):
If
df
is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'. - ignore_all (bool, default False):
If
True
, do not attempt to cast any columns to datetimes. - debug (bool, default False): Verbosity toggle.
Returns
- A new pandas DataFrame with the determined datetime columns
- (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']})
>>> df.dtypes
a object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a datetime64[ns]
dtype: object
603def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 604 """ 605 Get the columns which contain unhashable objects from a Pandas DataFrame. 606 607 Parameters 608 ---------- 609 df: pd.DataFrame 610 The DataFrame which may contain unhashable objects. 611 612 Returns 613 ------- 614 A list of columns. 615 """ 616 if df is None: 617 return [] 618 if len(df) == 0: 619 return [] 620 621 is_dask = 'dask' in df.__module__ 622 if is_dask: 623 from meerschaum.utils.packages import attempt_import 624 pandas = attempt_import('pandas') 625 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 626 return [ 627 col for col, val in df.iloc[0].items() 628 if not isinstance(val, Hashable) 629 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns.
632def get_json_cols(df: 'pd.DataFrame') -> List[str]: 633 """ 634 Get the columns which contain unhashable objects from a Pandas DataFrame. 635 636 Parameters 637 ---------- 638 df: pd.DataFrame 639 The DataFrame which may contain unhashable objects. 640 641 Returns 642 ------- 643 A list of columns to be encoded as JSON. 644 """ 645 if df is None: 646 return [] 647 648 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 649 if is_dask: 650 df = get_first_valid_dask_partition(df) 651 652 if len(df) == 0: 653 return [] 654 655 cols_indices = { 656 col: df[col].first_valid_index() 657 for col in df.columns 658 } 659 return [ 660 col 661 for col, ix in cols_indices.items() 662 if ( 663 ix is not None 664 and 665 not isinstance(df.loc[ix][col], Hashable) 666 ) 667 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns to be encoded as JSON.
670def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 671 """ 672 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 673 674 Parameters 675 ---------- 676 df: pd.DataFrame 677 The DataFrame which may contain decimal objects. 678 679 Returns 680 ------- 681 A list of columns to treat as numerics. 682 """ 683 if df is None: 684 return [] 685 from decimal import Decimal 686 is_dask = 'dask' in df.__module__ 687 if is_dask: 688 df = get_first_valid_dask_partition(df) 689 690 if len(df) == 0: 691 return [] 692 693 cols_indices = { 694 col: df[col].first_valid_index() 695 for col in df.columns 696 } 697 return [ 698 col 699 for col, ix in cols_indices.items() 700 if ( 701 ix is not None 702 and 703 isinstance(df.loc[ix][col], Decimal) 704 ) 705 ]
Get the columns which contain decimal.Decimal
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
- A list of columns to treat as numerics.
708def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 709 """ 710 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 711 712 Parameters 713 ---------- 714 df: pd.DataFrame 715 The DataFrame which may contain UUID objects. 716 717 Returns 718 ------- 719 A list of columns to treat as UUIDs. 720 """ 721 if df is None: 722 return [] 723 from uuid import UUID 724 is_dask = 'dask' in df.__module__ 725 if is_dask: 726 df = get_first_valid_dask_partition(df) 727 728 if len(df) == 0: 729 return [] 730 731 cols_indices = { 732 col: df[col].first_valid_index() 733 for col in df.columns 734 } 735 return [ 736 col 737 for col, ix in cols_indices.items() 738 if ( 739 ix is not None 740 and 741 isinstance(df.loc[ix][col], UUID) 742 ) 743 ]
Get the columns which contain uuid.UUID
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
- A list of columns to treat as UUIDs.
746def get_datetime_cols( 747 df: 'pd.DataFrame', 748 timezone_aware: bool = True, 749 timezone_naive: bool = True, 750) -> List[str]: 751 """ 752 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 753 754 Parameters 755 ---------- 756 df: pd.DataFrame 757 The DataFrame which may contain `datetime` or `Timestamp` objects. 758 759 timezone_aware: bool, default True 760 If `True`, include timezone-aware datetime columns. 761 762 timezone_naive: bool, default True 763 If `True`, include timezone-naive datetime columns. 764 765 Returns 766 ------- 767 A list of columns to treat as datetimes. 768 """ 769 if not timezone_aware and not timezone_naive: 770 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 771 772 if df is None: 773 return [] 774 775 from datetime import datetime 776 from meerschaum.utils.dtypes import are_dtypes_equal 777 is_dask = 'dask' in df.__module__ 778 if is_dask: 779 df = get_first_valid_dask_partition(df) 780 781 known_dt_cols = [ 782 col 783 for col, typ in df.dtypes.items() 784 if are_dtypes_equal('datetime', str(typ)) 785 ] 786 787 if len(df) == 0: 788 return known_dt_cols 789 790 cols_indices = { 791 col: df[col].first_valid_index() 792 for col in df.columns 793 if col not in known_dt_cols 794 } 795 pydt_cols = [ 796 col 797 for col, ix in cols_indices.items() 798 if ( 799 ix is not None 800 and 801 isinstance(df.loc[ix][col], datetime) 802 ) 803 ] 804 dt_cols_set = set(known_dt_cols + pydt_cols) 805 all_dt_cols = [ 806 col 807 for col in df.columns 808 if col in dt_cols_set 809 ] 810 if timezone_aware and timezone_naive: 811 return all_dt_cols 812 813 known_timezone_aware_dt_cols = [ 814 col 815 for col in known_dt_cols 816 if getattr(df[col], 'tz', None) is not None 817 ] 818 timezone_aware_pydt_cols = [ 819 col 820 for col in pydt_cols 821 if df.loc[cols_indices[col]][col].tzinfo is not None 822 ] 823 timezone_aware_dt_cols_set = set(known_timezone_aware_dt_cols + timezone_aware_pydt_cols) 824 if timezone_aware: 825 return [ 826 col 827 for col in all_dt_cols 828 if col in timezone_aware_pydt_cols 829 ] 830 831 return [ 832 col 833 for col in all_dt_cols 834 if col not in timezone_aware_dt_cols_set 835 ]
Get the columns which contain datetime
or Timestamp
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame):
The DataFrame which may contain
datetime
orTimestamp
objects. - timezone_aware (bool, default True):
If
True
, include timezone-aware datetime columns. - timezone_naive (bool, default True):
If
True
, include timezone-naive datetime columns.
Returns
- A list of columns to treat as datetimes.
838def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 839 """ 840 Get the columns which contain bytes strings from a Pandas DataFrame. 841 842 Parameters 843 ---------- 844 df: pd.DataFrame 845 The DataFrame which may contain bytes strings. 846 847 Returns 848 ------- 849 A list of columns to treat as bytes. 850 """ 851 if df is None: 852 return [] 853 is_dask = 'dask' in df.__module__ 854 if is_dask: 855 df = get_first_valid_dask_partition(df) 856 857 if len(df) == 0: 858 return [] 859 860 cols_indices = { 861 col: df[col].first_valid_index() 862 for col in df.columns 863 } 864 return [ 865 col 866 for col, ix in cols_indices.items() 867 if ( 868 ix is not None 869 and 870 isinstance(df.loc[ix][col], bytes) 871 ) 872 ]
Get the columns which contain bytes strings from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
- A list of columns to treat as bytes.
875def get_geometry_cols( 876 df: 'pd.DataFrame', 877 with_types_srids: bool = False, 878) -> Union[List[str], Dict[str, Any]]: 879 """ 880 Get the columns which contain shapely objects from a Pandas DataFrame. 881 882 Parameters 883 ---------- 884 df: pd.DataFrame 885 The DataFrame which may contain bytes strings. 886 887 with_types_srids: bool, default False 888 If `True`, return a dictionary mapping columns to geometry types and SRIDs. 889 890 Returns 891 ------- 892 A list of columns to treat as `geometry`. 893 If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID). 894 """ 895 if df is None: 896 return [] 897 898 is_dask = 'dask' in df.__module__ 899 if is_dask: 900 df = get_first_valid_dask_partition(df) 901 902 if len(df) == 0: 903 return [] 904 905 cols_indices = { 906 col: df[col].first_valid_index() 907 for col in df.columns 908 } 909 geo_cols = [ 910 col 911 for col, ix in cols_indices.items() 912 if ( 913 ix is not None 914 and 915 'shapely' in str(type(df.loc[ix][col])) 916 ) 917 ] 918 if not with_types_srids: 919 return geo_cols 920 921 gpd = mrsm.attempt_import('geopandas', lazy=False) 922 geo_cols_types_srids = {} 923 for col in geo_cols: 924 try: 925 sample_geo_series = gpd.GeoSeries(df[col], crs=None) 926 geometry_types = { 927 geom.geom_type 928 for geom in sample_geo_series 929 if hasattr(geom, 'geom_type') 930 } 931 geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series) 932 srid = ( 933 ( 934 sample_geo_series.crs.sub_crs_list[0].to_epsg() 935 if sample_geo_series.crs.is_compound 936 else sample_geo_series.crs.to_epsg() 937 ) 938 if sample_geo_series.crs 939 else 0 940 ) 941 geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry' 942 if geometry_type != 'geometry' and geometry_has_z: 943 geometry_type = geometry_type + 'Z' 944 except Exception: 945 srid = 0 946 geometry_type = 'geometry' 947 geo_cols_types_srids[col] = (geometry_type, srid) 948 949 return geo_cols_types_srids
Get the columns which contain shapely objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bytes strings.
- with_types_srids (bool, default False):
If
True
, return a dictionary mapping columns to geometry types and SRIDs.
Returns
- A list of columns to treat as
geometry
. - If
with_types_srids
, return a dictionary mapping columns to tuples in the form (type, SRID).
952def enforce_dtypes( 953 df: 'pd.DataFrame', 954 dtypes: Dict[str, str], 955 safe_copy: bool = True, 956 coerce_numeric: bool = True, 957 coerce_timezone: bool = True, 958 strip_timezone: bool = False, 959 debug: bool = False, 960) -> 'pd.DataFrame': 961 """ 962 Enforce the `dtypes` dictionary on a DataFrame. 963 964 Parameters 965 ---------- 966 df: pd.DataFrame 967 The DataFrame on which to enforce dtypes. 968 969 dtypes: Dict[str, str] 970 The data types to attempt to enforce on the DataFrame. 971 972 safe_copy: bool, default True 973 If `True`, create a copy before comparing and modifying the dataframes. 974 Setting to `False` may mutate the DataFrames. 975 See `meerschaum.utils.dataframe.filter_unseen_df`. 976 977 coerce_numeric: bool, default True 978 If `True`, convert float and int collisions to numeric. 979 980 coerce_timezone: bool, default True 981 If `True`, convert datetimes to UTC. 982 983 strip_timezone: bool, default False 984 If `coerce_timezone` and `strip_timezone` are `True`, 985 remove timezone information from datetimes. 986 987 debug: bool, default False 988 Verbosity toggle. 989 990 Returns 991 ------- 992 The Pandas DataFrame with the types enforced. 993 """ 994 import json 995 import functools 996 from meerschaum.utils.debug import dprint 997 from meerschaum.utils.formatting import pprint 998 from meerschaum.utils.dtypes import ( 999 are_dtypes_equal, 1000 to_pandas_dtype, 1001 is_dtype_numeric, 1002 attempt_cast_to_numeric, 1003 attempt_cast_to_uuid, 1004 attempt_cast_to_bytes, 1005 attempt_cast_to_geometry, 1006 coerce_timezone as _coerce_timezone, 1007 ) 1008 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 1009 pandas = mrsm.attempt_import('pandas') 1010 is_dask = 'dask' in df.__module__ 1011 if safe_copy: 1012 df = df.copy() 1013 if len(df.columns) == 0: 1014 if debug: 1015 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 1016 return df 1017 1018 pipe_pandas_dtypes = { 1019 col: to_pandas_dtype(typ) 1020 for col, typ in dtypes.items() 1021 } 1022 json_cols = [ 1023 col 1024 for col, typ in dtypes.items() 1025 if typ == 'json' 1026 ] 1027 numeric_cols = [ 1028 col 1029 for col, typ in dtypes.items() 1030 if typ.startswith('numeric') 1031 ] 1032 geometry_cols = [ 1033 col 1034 for col, typ in dtypes.items() 1035 if typ.startswith('geometry') or typ.startswith('geography') 1036 ] 1037 uuid_cols = [ 1038 col 1039 for col, typ in dtypes.items() 1040 if typ == 'uuid' 1041 ] 1042 bytes_cols = [ 1043 col 1044 for col, typ in dtypes.items() 1045 if typ == 'bytes' 1046 ] 1047 datetime_cols = [ 1048 col 1049 for col, typ in dtypes.items() 1050 if are_dtypes_equal(typ, 'datetime') 1051 ] 1052 df_numeric_cols = get_numeric_cols(df) 1053 if debug: 1054 dprint("Desired data types:") 1055 pprint(dtypes) 1056 dprint("Data types for incoming DataFrame:") 1057 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 1058 1059 if json_cols and len(df) > 0: 1060 if debug: 1061 dprint(f"Checking columns for JSON encoding: {json_cols}") 1062 for col in json_cols: 1063 if col in df.columns: 1064 try: 1065 df[col] = df[col].apply( 1066 ( 1067 lambda x: ( 1068 json.loads(x) 1069 if isinstance(x, str) 1070 else x 1071 ) 1072 ) 1073 ) 1074 except Exception as e: 1075 if debug: 1076 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 1077 1078 if numeric_cols: 1079 if debug: 1080 dprint(f"Checking for numerics: {numeric_cols}") 1081 for col in numeric_cols: 1082 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 1083 if col in df.columns: 1084 try: 1085 df[col] = df[col].apply( 1086 functools.partial( 1087 attempt_cast_to_numeric, 1088 quantize=True, 1089 precision=precision, 1090 scale=scale, 1091 ) 1092 ) 1093 except Exception as e: 1094 if debug: 1095 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 1096 1097 if uuid_cols: 1098 if debug: 1099 dprint(f"Checking for UUIDs: {uuid_cols}") 1100 for col in uuid_cols: 1101 if col in df.columns: 1102 try: 1103 df[col] = df[col].apply(attempt_cast_to_uuid) 1104 except Exception as e: 1105 if debug: 1106 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1107 1108 if bytes_cols: 1109 if debug: 1110 dprint(f"Checking for bytes: {bytes_cols}") 1111 for col in bytes_cols: 1112 if col in df.columns: 1113 try: 1114 df[col] = df[col].apply(attempt_cast_to_bytes) 1115 except Exception as e: 1116 if debug: 1117 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1118 1119 if datetime_cols and coerce_timezone: 1120 if debug: 1121 dprint(f"Checking for datetime conversion: {datetime_cols}") 1122 for col in datetime_cols: 1123 if col in df.columns: 1124 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1125 1126 if geometry_cols: 1127 geopandas = mrsm.attempt_import('geopandas') 1128 if debug: 1129 dprint(f"Checking for geometry: {geometry_cols}") 1130 parsed_geom_cols = [] 1131 for col in geometry_cols: 1132 try: 1133 df[col] = df[col].apply(attempt_cast_to_geometry) 1134 parsed_geom_cols.append(col) 1135 except Exception as e: 1136 if debug: 1137 dprint(f"Unable to parse column '{col}' as geometry:\n{e}") 1138 1139 if parsed_geom_cols: 1140 if debug: 1141 dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...") 1142 df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0]) 1143 try: 1144 df.rename_geometry(parsed_geom_cols[0], inplace=True) 1145 except ValueError: 1146 pass 1147 1148 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1149 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1150 if debug: 1151 dprint("Data types match. Exiting enforcement...") 1152 return df 1153 1154 common_dtypes = {} 1155 common_diff_dtypes = {} 1156 for col, typ in pipe_pandas_dtypes.items(): 1157 if col in df_dtypes: 1158 common_dtypes[col] = typ 1159 if not are_dtypes_equal(typ, df_dtypes[col]): 1160 common_diff_dtypes[col] = df_dtypes[col] 1161 1162 if debug: 1163 dprint("Common columns with different dtypes:") 1164 pprint(common_diff_dtypes) 1165 1166 detected_dt_cols = {} 1167 for col, typ in common_diff_dtypes.items(): 1168 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1169 df_dtypes[col] = typ 1170 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1171 for col in detected_dt_cols: 1172 del common_diff_dtypes[col] 1173 1174 if debug: 1175 dprint("Common columns with different dtypes (after dates):") 1176 pprint(common_diff_dtypes) 1177 1178 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1179 if debug: 1180 dprint( 1181 "The incoming DataFrame has mostly the same types, skipping enforcement." 1182 + "The only detected difference was in the following datetime columns." 1183 ) 1184 pprint(detected_dt_cols) 1185 return df 1186 1187 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1188 previous_typ = common_dtypes[col] 1189 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1190 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 1191 explicitly_numeric = dtypes.get(col, 'numeric').startswith('numeric') 1192 cast_to_numeric = ( 1193 explicitly_numeric 1194 or col in df_numeric_cols 1195 or (mixed_numeric_types and not explicitly_float) 1196 ) and coerce_numeric 1197 if cast_to_numeric: 1198 common_dtypes[col] = attempt_cast_to_numeric 1199 common_diff_dtypes[col] = attempt_cast_to_numeric 1200 1201 for d in common_diff_dtypes: 1202 t = common_dtypes[d] 1203 if debug: 1204 dprint(f"Casting column {d} to dtype {t}.") 1205 try: 1206 df[d] = ( 1207 df[d].apply(t) 1208 if callable(t) 1209 else df[d].astype(t) 1210 ) 1211 except Exception as e: 1212 if debug: 1213 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 1214 if 'int' in str(t).lower(): 1215 try: 1216 df[d] = df[d].astype('float64').astype(t) 1217 except Exception: 1218 if debug: 1219 dprint(f"Was unable to convert to float then {t}.") 1220 return df
Enforce the dtypes
dictionary on a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame on which to enforce dtypes.
- dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
- safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df
. - coerce_numeric (bool, default True):
If
True
, convert float and int collisions to numeric. - coerce_timezone (bool, default True):
If
True
, convert datetimes to UTC. - strip_timezone (bool, default False):
If
coerce_timezone
andstrip_timezone
areTrue
, remove timezone information from datetimes. - debug (bool, default False): Verbosity toggle.
Returns
- The Pandas DataFrame with the types enforced.
1223def get_datetime_bound_from_df( 1224 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1225 datetime_column: str, 1226 minimum: bool = True, 1227) -> Union[int, datetime, None]: 1228 """ 1229 Return the minimum or maximum datetime (or integer) from a DataFrame. 1230 1231 Parameters 1232 ---------- 1233 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1234 The DataFrame, list, or dict which contains the range axis. 1235 1236 datetime_column: str 1237 The name of the datetime (or int) column. 1238 1239 minimum: bool 1240 Whether to return the minimum (default) or maximum value. 1241 1242 Returns 1243 ------- 1244 The minimum or maximum datetime value in the dataframe, or `None`. 1245 """ 1246 from meerschaum.utils.dtypes import to_datetime, value_is_null 1247 1248 if df is None: 1249 return None 1250 if not datetime_column: 1251 return None 1252 1253 def compare(a, b): 1254 if a is None: 1255 return b 1256 if b is None: 1257 return a 1258 if minimum: 1259 return a if a < b else b 1260 return a if a > b else b 1261 1262 if isinstance(df, list): 1263 if len(df) == 0: 1264 return None 1265 best_yet = df[0].get(datetime_column, None) 1266 for doc in df: 1267 val = doc.get(datetime_column, None) 1268 best_yet = compare(best_yet, val) 1269 return best_yet 1270 1271 if isinstance(df, dict): 1272 if datetime_column not in df: 1273 return None 1274 best_yet = df[datetime_column][0] 1275 for val in df[datetime_column]: 1276 best_yet = compare(best_yet, val) 1277 return best_yet 1278 1279 if 'DataFrame' in str(type(df)): 1280 from meerschaum.utils.dtypes import are_dtypes_equal 1281 pandas = mrsm.attempt_import('pandas') 1282 is_dask = 'dask' in df.__module__ 1283 1284 if datetime_column not in df.columns: 1285 return None 1286 1287 try: 1288 dt_val = ( 1289 df[datetime_column].min(skipna=True) 1290 if minimum 1291 else df[datetime_column].max(skipna=True) 1292 ) 1293 except Exception: 1294 dt_val = pandas.NA 1295 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1296 dt_val = dt_val.compute() 1297 1298 return ( 1299 to_datetime(dt_val, as_pydatetime=True) 1300 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1301 else (dt_val if not value_is_null(dt_val) else None) 1302 ) 1303 1304 return None
Return the minimum or maximum datetime (or integer) from a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
- datetime_column (str): The name of the datetime (or int) column.
- minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
- The minimum or maximum datetime value in the dataframe, or
None
.
1307def get_unique_index_values( 1308 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1309 indices: List[str], 1310) -> Dict[str, List[Any]]: 1311 """ 1312 Return a dictionary of the unique index values in a DataFrame. 1313 1314 Parameters 1315 ---------- 1316 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1317 The dataframe (or list or dict) which contains index values. 1318 1319 indices: List[str] 1320 The list of index columns. 1321 1322 Returns 1323 ------- 1324 A dictionary mapping indices to unique values. 1325 """ 1326 if df is None: 1327 return {} 1328 if 'dataframe' in str(type(df)).lower(): 1329 pandas = mrsm.attempt_import('pandas') 1330 return { 1331 col: list({ 1332 (val if val is not pandas.NA else None) 1333 for val in df[col].unique() 1334 }) 1335 for col in indices 1336 if col in df.columns 1337 } 1338 1339 unique_indices = defaultdict(lambda: set()) 1340 if isinstance(df, list): 1341 for doc in df: 1342 for index in indices: 1343 if index in doc: 1344 unique_indices[index].add(doc[index]) 1345 1346 elif isinstance(df, dict): 1347 for index in indices: 1348 if index in df: 1349 unique_indices[index] = unique_indices[index].union(set(df[index])) 1350 1351 return {key: list(val) for key, val in unique_indices.items()}
Return a dictionary of the unique index values in a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
- indices (List[str]): The list of index columns.
Returns
- A dictionary mapping indices to unique values.
1354def df_is_chunk_generator(df: Any) -> bool: 1355 """ 1356 Determine whether to treat `df` as a chunk generator. 1357 1358 Note this should only be used in a context where generators are expected, 1359 as it will return `True` for any iterable. 1360 1361 Parameters 1362 ---------- 1363 The DataFrame or chunk generator to evaluate. 1364 1365 Returns 1366 ------- 1367 A `bool` indicating whether to treat `df` as a generator. 1368 """ 1369 return ( 1370 not isinstance(df, (dict, list, str)) 1371 and 'DataFrame' not in str(type(df)) 1372 and isinstance(df, (Generator, Iterable, Iterator)) 1373 )
Determine whether to treat df
as a chunk generator.
Note this should only be used in a context where generators are expected,
as it will return True
for any iterable.
Parameters
- The DataFrame or chunk generator to evaluate.
Returns
- A
bool
indicating whether to treatdf
as a generator.
1376def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1377 """ 1378 Return the Dask `npartitions` value for a given `chunksize`. 1379 """ 1380 if chunksize == -1: 1381 from meerschaum.config import get_config 1382 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1383 if chunksize is None: 1384 return 1 1385 return -1 * chunksize
Return the Dask npartitions
value for a given chunksize
.
1388def df_from_literal( 1389 pipe: Optional[mrsm.Pipe] = None, 1390 literal: str = None, 1391 debug: bool = False 1392) -> 'pd.DataFrame': 1393 """ 1394 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1395 1396 Parameters 1397 ---------- 1398 pipe: Optional['meerschaum.Pipe'], default None 1399 The pipe which will consume the literal value. 1400 1401 Returns 1402 ------- 1403 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1404 and the literal as the value. 1405 """ 1406 from meerschaum.utils.packages import import_pandas 1407 from meerschaum.utils.warnings import error, warn 1408 from meerschaum.utils.debug import dprint 1409 1410 if pipe is None or literal is None: 1411 error("Please provide a Pipe and a literal value") 1412 ### this will raise an error if the columns are undefined 1413 dt_name, val_name = pipe.get_columns('datetime', 'value') 1414 1415 val = literal 1416 if isinstance(literal, str): 1417 if debug: 1418 dprint(f"Received literal string: '{literal}'") 1419 import ast 1420 try: 1421 val = ast.literal_eval(literal) 1422 except Exception: 1423 warn( 1424 "Failed to parse value from string:\n" + f"{literal}" + 1425 "\n\nWill cast as a string instead."\ 1426 ) 1427 val = literal 1428 1429 from datetime import datetime, timezone 1430 now = datetime.now(timezone.utc).replace(tzinfo=None) 1431 1432 pd = import_pandas() 1433 return pd.DataFrame({dt_name: [now], val_name: [val]})
Construct a dataframe from a literal value, using the pipe's datetime and value column names.
Parameters
- pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
- A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
- and the literal as the value.
1436def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1437 """ 1438 Return the first valid Dask DataFrame partition (if possible). 1439 """ 1440 pdf = None 1441 for partition in ddf.partitions: 1442 try: 1443 pdf = partition.compute() 1444 except Exception: 1445 continue 1446 if len(pdf) > 0: 1447 return pdf 1448 _ = mrsm.attempt_import('partd', lazy=False) 1449 return ddf.compute()
Return the first valid Dask DataFrame partition (if possible).
1452def query_df( 1453 df: 'pd.DataFrame', 1454 params: Optional[Dict[str, Any]] = None, 1455 begin: Union[datetime, int, None] = None, 1456 end: Union[datetime, int, None] = None, 1457 datetime_column: Optional[str] = None, 1458 select_columns: Optional[List[str]] = None, 1459 omit_columns: Optional[List[str]] = None, 1460 inplace: bool = False, 1461 reset_index: bool = False, 1462 coerce_types: bool = False, 1463 debug: bool = False, 1464) -> 'pd.DataFrame': 1465 """ 1466 Query the dataframe with the params dictionary. 1467 1468 Parameters 1469 ---------- 1470 df: pd.DataFrame 1471 The DataFrame to query against. 1472 1473 params: Optional[Dict[str, Any]], default None 1474 The parameters dictionary to use for the query. 1475 1476 begin: Union[datetime, int, None], default None 1477 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1478 greater than or equal to this value. 1479 1480 end: Union[datetime, int, None], default None 1481 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1482 less than this value. 1483 1484 datetime_column: Optional[str], default None 1485 A `datetime_column` must be provided to use `begin` and `end`. 1486 1487 select_columns: Optional[List[str]], default None 1488 If provided, only return these columns. 1489 1490 omit_columns: Optional[List[str]], default None 1491 If provided, do not include these columns in the result. 1492 1493 inplace: bool, default False 1494 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1495 1496 reset_index: bool, default False 1497 If `True`, reset the index in the resulting DataFrame. 1498 1499 coerce_types: bool, default False 1500 If `True`, cast the dataframe and parameters as strings before querying. 1501 1502 Returns 1503 ------- 1504 A Pandas DataFrame query result. 1505 """ 1506 1507 def _process_select_columns(_df): 1508 if not select_columns: 1509 return 1510 for col in list(_df.columns): 1511 if col not in select_columns: 1512 del _df[col] 1513 1514 def _process_omit_columns(_df): 1515 if not omit_columns: 1516 return 1517 for col in list(_df.columns): 1518 if col in omit_columns: 1519 del _df[col] 1520 1521 if not params and not begin and not end: 1522 if not inplace: 1523 df = df.copy() 1524 _process_select_columns(df) 1525 _process_omit_columns(df) 1526 return df 1527 1528 from meerschaum.utils.debug import dprint 1529 from meerschaum.utils.misc import get_in_ex_params 1530 from meerschaum.utils.warnings import warn 1531 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1532 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1533 pandas = mrsm.attempt_import('pandas') 1534 NA = pandas.NA 1535 1536 if params: 1537 params = params.copy() 1538 for key, val in {k: v for k, v in params.items()}.items(): 1539 if isinstance(val, (list, tuple)): 1540 if None in val: 1541 val = [item for item in val if item is not None] + [NA] 1542 params[key] = val 1543 if coerce_types: 1544 params[key] = [str(x) for x in val] 1545 else: 1546 if value_is_null(val): 1547 val = NA 1548 params[key] = NA 1549 if coerce_types: 1550 params[key] = str(val) 1551 1552 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1553 1554 if inplace: 1555 df.fillna(NA, inplace=True) 1556 else: 1557 df = df.infer_objects().fillna(NA) 1558 1559 if isinstance(begin, str): 1560 begin = dateutil_parser.parse(begin) 1561 if isinstance(end, str): 1562 end = dateutil_parser.parse(end) 1563 1564 if begin is not None or end is not None: 1565 if not datetime_column or datetime_column not in df.columns: 1566 warn( 1567 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1568 + "ignoring begin and end...", 1569 ) 1570 begin, end = None, None 1571 1572 if debug: 1573 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1574 1575 if datetime_column and (begin is not None or end is not None): 1576 if debug: 1577 dprint("Checking for datetime column compatability.") 1578 1579 from meerschaum.utils.dtypes import coerce_timezone 1580 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1581 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1582 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1583 1584 if df_is_dt: 1585 df_tz = ( 1586 getattr(df[datetime_column].dt, 'tz', None) 1587 if hasattr(df[datetime_column], 'dt') 1588 else None 1589 ) 1590 1591 if begin_is_int: 1592 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1593 if debug: 1594 dprint(f"`begin` will be cast to '{begin}'.") 1595 if end_is_int: 1596 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1597 if debug: 1598 dprint(f"`end` will be cast to '{end}'.") 1599 1600 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1601 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1602 1603 in_ex_params = get_in_ex_params(params) 1604 1605 masks = [ 1606 ( 1607 (df[datetime_column] >= begin) 1608 if begin is not None and datetime_column 1609 else True 1610 ) & ( 1611 (df[datetime_column] < end) 1612 if end is not None and datetime_column 1613 else True 1614 ) 1615 ] 1616 1617 masks.extend([ 1618 ( 1619 ( 1620 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1621 if in_vals 1622 else True 1623 ) & ( 1624 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1625 if ex_vals 1626 else True 1627 ) 1628 ) 1629 for col, (in_vals, ex_vals) in in_ex_params.items() 1630 if col in df.columns 1631 ]) 1632 query_mask = masks[0] 1633 for mask in masks[1:]: 1634 query_mask = query_mask & mask 1635 1636 original_cols = df.columns 1637 1638 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1639 ### to allow for `<NA>` values. 1640 bool_cols = [ 1641 col 1642 for col, typ in df.dtypes.items() 1643 if are_dtypes_equal(str(typ), 'bool') 1644 ] 1645 for col in bool_cols: 1646 df[col] = df[col].astype('boolean[pyarrow]') 1647 1648 if not isinstance(query_mask, bool): 1649 df['__mrsm_mask'] = ( 1650 query_mask.astype('boolean[pyarrow]') 1651 if hasattr(query_mask, 'astype') 1652 else query_mask 1653 ) 1654 1655 if inplace: 1656 df.where(query_mask, other=NA, inplace=True) 1657 df.dropna(how='all', inplace=True) 1658 result_df = df 1659 else: 1660 result_df = df.where(query_mask, other=NA) 1661 result_df.dropna(how='all', inplace=True) 1662 1663 else: 1664 result_df = df 1665 1666 if '__mrsm_mask' in df.columns: 1667 del df['__mrsm_mask'] 1668 if '__mrsm_mask' in result_df.columns: 1669 del result_df['__mrsm_mask'] 1670 1671 if reset_index: 1672 result_df.reset_index(drop=True, inplace=True) 1673 1674 result_df = enforce_dtypes( 1675 result_df, 1676 dtypes, 1677 safe_copy=False, 1678 debug=debug, 1679 coerce_numeric=False, 1680 coerce_timezone=False, 1681 ) 1682 1683 if select_columns == ['*']: 1684 select_columns = None 1685 1686 if not select_columns and not omit_columns: 1687 return result_df[original_cols] 1688 1689 _process_select_columns(result_df) 1690 _process_omit_columns(result_df) 1691 1692 return result_df
Query the dataframe with the params dictionary.
Parameters
- df (pd.DataFrame): The DataFrame to query against.
- params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
- begin (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp greater than or equal to this value. - end (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp less than this value. - datetime_column (Optional[str], default None):
A
datetime_column
must be provided to usebegin
andend
. - select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
- inplace (bool, default False):
If
True
, modify the DataFrame inplace rather than creating a new DataFrame. - reset_index (bool, default False):
If
True
, reset the index in the resulting DataFrame. - coerce_types (bool, default False):
If
True
, cast the dataframe and parameters as strings before querying.
Returns
- A Pandas DataFrame query result.
1695def to_json( 1696 df: 'pd.DataFrame', 1697 safe_copy: bool = True, 1698 orient: str = 'records', 1699 date_format: str = 'iso', 1700 date_unit: str = 'us', 1701 double_precision: int = 15, 1702 geometry_format: str = 'geojson', 1703 **kwargs: Any 1704) -> str: 1705 """ 1706 Serialize the given dataframe as a JSON string. 1707 1708 Parameters 1709 ---------- 1710 df: pd.DataFrame 1711 The DataFrame to be serialized. 1712 1713 safe_copy: bool, default True 1714 If `False`, modify the DataFrame inplace. 1715 1716 date_format: str, default 'iso' 1717 The default format for timestamps. 1718 1719 date_unit: str, default 'us' 1720 The precision of the timestamps. 1721 1722 double_precision: int, default 15 1723 The number of decimal places to use when encoding floating point values (maximum 15). 1724 1725 geometry_format: str, default 'geojson' 1726 The serialization format for geometry data. 1727 Accepted values are `geojson`, `wkb_hex`, and `wkt`. 1728 1729 Returns 1730 ------- 1731 A JSON string. 1732 """ 1733 import warnings 1734 import functools 1735 from meerschaum.utils.packages import import_pandas 1736 from meerschaum.utils.dtypes import ( 1737 serialize_bytes, 1738 serialize_decimal, 1739 serialize_geometry, 1740 ) 1741 pd = import_pandas() 1742 uuid_cols = get_uuid_cols(df) 1743 bytes_cols = get_bytes_cols(df) 1744 numeric_cols = get_numeric_cols(df) 1745 geometry_cols = get_geometry_cols(df) 1746 if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols): 1747 df = df.copy() 1748 for col in uuid_cols: 1749 df[col] = df[col].astype(str) 1750 for col in bytes_cols: 1751 df[col] = df[col].apply(serialize_bytes) 1752 for col in numeric_cols: 1753 df[col] = df[col].apply(serialize_decimal) 1754 with warnings.catch_warnings(): 1755 warnings.simplefilter("ignore") 1756 for col in geometry_cols: 1757 df[col] = df[col].apply( 1758 functools.partial( 1759 serialize_geometry, 1760 geometry_format=geometry_format, 1761 ) 1762 ) 1763 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 1764 date_format=date_format, 1765 date_unit=date_unit, 1766 double_precision=double_precision, 1767 orient=orient, 1768 **kwargs 1769 )
Serialize the given dataframe as a JSON string.
Parameters
- df (pd.DataFrame): The DataFrame to be serialized.
- safe_copy (bool, default True):
If
False
, modify the DataFrame inplace. - date_format (str, default 'iso'): The default format for timestamps.
- date_unit (str, default 'us'): The precision of the timestamps.
- double_precision (int, default 15): The number of decimal places to use when encoding floating point values (maximum 15).
- geometry_format (str, default 'geojson'):
The serialization format for geometry data.
Accepted values are
geojson
,wkb_hex
, andwkt
.
Returns
- A JSON string.