meerschaum.utils.dataframe
Utility functions for working with DataFrames.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with DataFrames. 7""" 8 9from __future__ import annotations 10 11from datetime import datetime, timezone, date 12from collections import defaultdict 13 14import meerschaum as mrsm 15from meerschaum.utils.typing import ( 16 Optional, Dict, Any, List, Hashable, Generator, 17 Iterator, Iterable, Union, TYPE_CHECKING, Tuple, 18) 19 20if TYPE_CHECKING: 21 pd, dask = mrsm.attempt_import('pandas', 'dask') 22 23 24def add_missing_cols_to_df( 25 df: 'pd.DataFrame', 26 dtypes: Dict[str, Any], 27) -> 'pd.DataFrame': 28 """ 29 Add columns from the dtypes dictionary as null columns to a new DataFrame. 30 31 Parameters 32 ---------- 33 df: pd.DataFrame 34 The dataframe we should copy and add null columns. 35 36 dtypes: 37 The data types dictionary which may contain keys not present in `df.columns`. 38 39 Returns 40 ------- 41 A new `DataFrame` with the keys from `dtypes` added as null columns. 42 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 43 NOTE: This will not ensure that dtypes are enforced! 44 45 Examples 46 -------- 47 >>> import pandas as pd 48 >>> df = pd.DataFrame([{'a': 1}]) 49 >>> dtypes = {'b': 'Int64'} 50 >>> add_missing_cols_to_df(df, dtypes) 51 a b 52 0 1 <NA> 53 >>> add_missing_cols_to_df(df, dtypes).dtypes 54 a int64 55 b Int64 56 dtype: object 57 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 58 a int64 59 dtype: object 60 >>> 61 """ 62 if set(df.columns) == set(dtypes): 63 return df 64 65 from meerschaum.utils.packages import attempt_import 66 from meerschaum.utils.dtypes import to_pandas_dtype 67 pandas = attempt_import('pandas') 68 69 def build_series(dtype: str): 70 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 71 72 assign_kwargs = { 73 str(col): build_series(str(typ)) 74 for col, typ in dtypes.items() 75 if col not in df.columns 76 } 77 df_with_cols = df.assign(**assign_kwargs) 78 for col in assign_kwargs: 79 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 80 return df_with_cols 81 82 83def filter_unseen_df( 84 old_df: 'pd.DataFrame', 85 new_df: 'pd.DataFrame', 86 safe_copy: bool = True, 87 dtypes: Optional[Dict[str, Any]] = None, 88 include_unchanged_columns: bool = False, 89 coerce_mixed_numerics: bool = True, 90 debug: bool = False, 91) -> 'pd.DataFrame': 92 """ 93 Left join two DataFrames to find the newest unseen data. 94 95 Parameters 96 ---------- 97 old_df: 'pd.DataFrame' 98 The original (target) dataframe. Acts as a filter on the `new_df`. 99 100 new_df: 'pd.DataFrame' 101 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 102 103 safe_copy: bool, default True 104 If `True`, create a copy before comparing and modifying the dataframes. 105 Setting to `False` may mutate the DataFrames. 106 107 dtypes: Optional[Dict[str, Any]], default None 108 Optionally specify the datatypes of the dataframe. 109 110 include_unchanged_columns: bool, default False 111 If `True`, include columns which haven't changed on rows which have changed. 112 113 coerce_mixed_numerics: bool, default True 114 If `True`, cast mixed integer and float columns between the old and new dataframes into 115 numeric values (`decimal.Decimal`). 116 117 debug: bool, default False 118 Verbosity toggle. 119 120 Returns 121 ------- 122 A pandas dataframe of the new, unseen rows in `new_df`. 123 124 Examples 125 -------- 126 ```python 127 >>> import pandas as pd 128 >>> df1 = pd.DataFrame({'a': [1,2]}) 129 >>> df2 = pd.DataFrame({'a': [2,3]}) 130 >>> filter_unseen_df(df1, df2) 131 a 132 0 3 133 134 ``` 135 136 """ 137 if old_df is None: 138 return new_df 139 140 if safe_copy: 141 old_df = old_df.copy() 142 new_df = new_df.copy() 143 144 import json 145 import functools 146 import traceback 147 from meerschaum.utils.warnings import warn 148 from meerschaum.utils.packages import import_pandas, attempt_import 149 from meerschaum.utils.dtypes import ( 150 to_pandas_dtype, 151 are_dtypes_equal, 152 attempt_cast_to_numeric, 153 attempt_cast_to_uuid, 154 attempt_cast_to_bytes, 155 attempt_cast_to_geometry, 156 coerce_timezone, 157 serialize_decimal, 158 ) 159 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 160 pd = import_pandas(debug=debug) 161 is_dask = 'dask' in new_df.__module__ 162 if is_dask: 163 pandas = attempt_import('pandas') 164 _ = attempt_import('partd', lazy=False) 165 dd = attempt_import('dask.dataframe') 166 merge = dd.merge 167 NA = pandas.NA 168 else: 169 merge = pd.merge 170 NA = pd.NA 171 172 new_df_dtypes = dict(new_df.dtypes) 173 old_df_dtypes = dict(old_df.dtypes) 174 175 same_cols = set(new_df.columns) == set(old_df.columns) 176 if not same_cols: 177 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 178 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 179 180 new_types_missing_from_old = { 181 col: typ 182 for col, typ in new_df_dtypes.items() 183 if col not in old_df_dtypes 184 } 185 old_types_missing_from_new = { 186 col: typ 187 for col, typ in new_df_dtypes.items() 188 if col not in old_df_dtypes 189 } 190 old_df_dtypes.update(new_types_missing_from_old) 191 new_df_dtypes.update(old_types_missing_from_new) 192 193 ### Edge case: two empty lists cast to DFs. 194 elif len(new_df.columns) == 0: 195 return new_df 196 197 try: 198 ### Order matters when checking equality. 199 new_df = new_df[old_df.columns] 200 201 except Exception as e: 202 warn( 203 "Was not able to cast old columns onto new DataFrame. " + 204 f"Are both DataFrames the same shape? Error:\n{e}", 205 stacklevel=3, 206 ) 207 return new_df[list(new_df_dtypes.keys())] 208 209 ### assume the old_df knows what it's doing, even if it's technically wrong. 210 if dtypes is None: 211 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 212 213 dtypes = { 214 col: to_pandas_dtype(typ) 215 for col, typ in dtypes.items() 216 if col in new_df_dtypes and col in old_df_dtypes 217 } 218 for col, typ in new_df_dtypes.items(): 219 if col not in dtypes: 220 dtypes[col] = typ 221 222 numeric_cols_precisions_scales = { 223 col: get_numeric_precision_scale(None, typ) 224 for col, typ in dtypes.items() 225 if col and str(typ).lower().startswith('numeric') 226 } 227 228 dt_dtypes = { 229 col: typ 230 for col, typ in dtypes.items() 231 if are_dtypes_equal(typ, 'datetime') 232 } 233 non_dt_dtypes = { 234 col: typ 235 for col, typ in dtypes.items() 236 if col not in dt_dtypes 237 } 238 239 cast_non_dt_cols = True 240 try: 241 new_df = new_df.astype(non_dt_dtypes) 242 cast_non_dt_cols = False 243 except Exception as e: 244 warn( 245 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 246 ) 247 248 cast_dt_cols = True 249 try: 250 for col, typ in dt_dtypes.items(): 251 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 252 strip_utc = ( 253 _dtypes_col_dtype.startswith('datetime64') 254 and 'utc' not in _dtypes_col_dtype.lower() 255 ) 256 if col in old_df.columns: 257 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 258 if col in new_df.columns: 259 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 260 cast_dt_cols = False 261 except Exception as e: 262 warn(f"Could not cast datetime columns:\n{e}") 263 264 cast_cols = cast_dt_cols or cast_non_dt_cols 265 266 new_numeric_cols_existing = get_numeric_cols(new_df) 267 old_numeric_cols = get_numeric_cols(old_df) 268 for col, typ in {k: v for k, v in dtypes.items()}.items(): 269 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 270 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 271 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 272 new_is_numeric = col in new_numeric_cols_existing 273 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 274 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 275 old_is_numeric = col in old_numeric_cols 276 277 if ( 278 coerce_mixed_numerics 279 and 280 (new_is_float or new_is_int or new_is_numeric) 281 and 282 (old_is_float or old_is_int or old_is_numeric) 283 ): 284 dtypes[col] = attempt_cast_to_numeric 285 cast_cols = True 286 continue 287 288 ### Fallback to object if the types don't match. 289 warn( 290 f"Detected different types for '{col}' " 291 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 292 + "falling back to 'object'..." 293 ) 294 dtypes[col] = 'object' 295 cast_cols = True 296 297 if cast_cols: 298 for col, dtype in dtypes.items(): 299 if col in new_df.columns: 300 try: 301 new_df[col] = ( 302 new_df[col].astype(dtype) 303 if not callable(dtype) 304 else new_df[col].apply(dtype) 305 ) 306 except Exception as e: 307 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 308 309 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 310 new_json_cols = get_json_cols(new_df) 311 old_json_cols = get_json_cols(old_df) 312 json_cols = set(new_json_cols + old_json_cols) 313 for json_col in old_json_cols: 314 old_df[json_col] = old_df[json_col].apply(serializer) 315 for json_col in new_json_cols: 316 new_df[json_col] = new_df[json_col].apply(serializer) 317 318 new_numeric_cols = get_numeric_cols(new_df) 319 numeric_cols = set(new_numeric_cols + old_numeric_cols) 320 for numeric_col in old_numeric_cols: 321 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 322 for numeric_col in new_numeric_cols: 323 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 324 325 old_dt_cols = [ 326 col 327 for col, typ in old_df.dtypes.items() 328 if are_dtypes_equal(str(typ), 'datetime') 329 ] 330 for col in old_dt_cols: 331 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 332 strip_utc = ( 333 _dtypes_col_dtype.startswith('datetime64') 334 and 'utc' not in _dtypes_col_dtype.lower() 335 ) 336 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 337 338 new_dt_cols = [ 339 col 340 for col, typ in new_df.dtypes.items() 341 if are_dtypes_equal(str(typ), 'datetime') 342 ] 343 for col in new_dt_cols: 344 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 345 strip_utc = ( 346 _dtypes_col_dtype.startswith('datetime64') 347 and 'utc' not in _dtypes_col_dtype.lower() 348 ) 349 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 350 351 old_uuid_cols = get_uuid_cols(old_df) 352 new_uuid_cols = get_uuid_cols(new_df) 353 uuid_cols = set(new_uuid_cols + old_uuid_cols) 354 355 old_bytes_cols = get_bytes_cols(old_df) 356 new_bytes_cols = get_bytes_cols(new_df) 357 bytes_cols = set(new_bytes_cols + old_bytes_cols) 358 359 old_geometry_cols = get_geometry_cols(old_df) 360 new_geometry_cols = get_geometry_cols(new_df) 361 geometry_cols = set(new_geometry_cols + old_geometry_cols) 362 363 joined_df = merge( 364 new_df.infer_objects(copy=False).fillna(NA), 365 old_df.infer_objects(copy=False).fillna(NA), 366 how='left', 367 on=None, 368 indicator=True, 369 ) 370 changed_rows_mask = (joined_df['_merge'] == 'left_only') 371 new_cols = list(new_df_dtypes) 372 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 373 374 for json_col in json_cols: 375 if json_col not in delta_df.columns: 376 continue 377 try: 378 delta_df[json_col] = delta_df[json_col].apply( 379 lambda x: (json.loads(x) if isinstance(x, str) else x) 380 ) 381 except Exception: 382 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 383 384 for numeric_col in numeric_cols: 385 if numeric_col not in delta_df.columns: 386 continue 387 try: 388 delta_df[numeric_col] = delta_df[numeric_col].apply( 389 functools.partial( 390 attempt_cast_to_numeric, 391 quantize=True, 392 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 393 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 394 ) 395 ) 396 except Exception: 397 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 398 399 for uuid_col in uuid_cols: 400 if uuid_col not in delta_df.columns: 401 continue 402 try: 403 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 404 except Exception: 405 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 406 407 for bytes_col in bytes_cols: 408 if bytes_col not in delta_df.columns: 409 continue 410 try: 411 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 412 except Exception: 413 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 414 415 for geometry_col in geometry_cols: 416 if geometry_col not in delta_df.columns: 417 continue 418 try: 419 delta_df[geometry_col] = delta_df[geometry_col].apply(attempt_cast_to_geometry) 420 except Exception: 421 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 422 423 return delta_df 424 425 426def parse_df_datetimes( 427 df: 'pd.DataFrame', 428 ignore_cols: Optional[Iterable[str]] = None, 429 strip_timezone: bool = False, 430 chunksize: Optional[int] = None, 431 dtype_backend: str = 'numpy_nullable', 432 ignore_all: bool = False, 433 precision_unit: Optional[str] = None, 434 coerce_utc: bool = True, 435 debug: bool = False, 436) -> 'pd.DataFrame': 437 """ 438 Parse a pandas DataFrame for datetime columns and cast as datetimes. 439 440 Parameters 441 ---------- 442 df: pd.DataFrame 443 The pandas DataFrame to parse. 444 445 ignore_cols: Optional[Iterable[str]], default None 446 If provided, do not attempt to coerce these columns as datetimes. 447 448 strip_timezone: bool, default False 449 If `True`, remove the UTC `tzinfo` property. 450 451 chunksize: Optional[int], default None 452 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 453 454 dtype_backend: str, default 'numpy_nullable' 455 If `df` is not a DataFrame and new one needs to be constructed, 456 use this as the datatypes backend. 457 Accepted values are 'numpy_nullable' and 'pyarrow'. 458 459 ignore_all: bool, default False 460 If `True`, do not attempt to cast any columns to datetimes. 461 462 precision_unit: Optional[str], default None 463 If provided, enforce the given precision on the coerced datetime columns. 464 465 coerce_utc: bool, default True 466 Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`). 467 468 debug: bool, default False 469 Verbosity toggle. 470 471 Returns 472 ------- 473 A new pandas DataFrame with the determined datetime columns 474 (usually ISO strings) cast as datetimes. 475 476 Examples 477 -------- 478 ```python 479 >>> import pandas as pd 480 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 481 >>> df.dtypes 482 a object 483 dtype: object 484 >>> df2 = parse_df_datetimes(df) 485 >>> df2.dtypes 486 a datetime64[us, UTC] 487 dtype: object 488 489 ``` 490 491 """ 492 from meerschaum.utils.packages import import_pandas, attempt_import 493 from meerschaum.utils.debug import dprint 494 from meerschaum.utils.warnings import warn 495 from meerschaum.utils.misc import items_str 496 from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES 497 import traceback 498 499 pd = import_pandas() 500 pandas = attempt_import('pandas') 501 pd_name = pd.__name__ 502 using_dask = 'dask' in pd_name 503 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 504 dask_dataframe = None 505 if using_dask or df_is_dask: 506 npartitions = chunksize_to_npartitions(chunksize) 507 dask_dataframe = attempt_import('dask.dataframe') 508 509 ### if df is a dict, build DataFrame 510 if isinstance(df, pandas.DataFrame): 511 pdf = df 512 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 513 pdf = get_first_valid_dask_partition(df) 514 else: 515 if debug: 516 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 517 518 if using_dask: 519 if isinstance(df, list): 520 keys = set() 521 for doc in df: 522 for key in doc: 523 keys.add(key) 524 df = pd.DataFrame.from_dict( 525 { 526 k: [ 527 doc.get(k, None) 528 for doc in df 529 ] for k in keys 530 }, 531 npartitions=npartitions, 532 ) 533 elif isinstance(df, dict): 534 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 535 elif 'pandas.core.frame.DataFrame' in str(type(df)): 536 df = pd.from_pandas(df, npartitions=npartitions) 537 else: 538 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 539 pandas = attempt_import('pandas') 540 pdf = get_first_valid_dask_partition(df) 541 542 else: 543 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 544 pdf = df 545 546 ### skip parsing if DataFrame is empty 547 if len(pdf) == 0: 548 if debug: 549 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 550 return df 551 552 ignore_cols = set( 553 (ignore_cols or []) + [ 554 col 555 for col, dtype in pdf.dtypes.items() 556 if 'datetime' in str(dtype) 557 ] 558 ) 559 cols_to_inspect = [ 560 col 561 for col in pdf.columns 562 if col not in ignore_cols 563 ] if not ignore_all else [] 564 565 if len(cols_to_inspect) == 0: 566 if debug: 567 dprint("All columns are ignored, skipping datetime detection...") 568 return df.infer_objects(copy=False).fillna(pandas.NA) 569 570 ### apply regex to columns to determine which are ISO datetimes 571 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 572 dt_mask = pdf[cols_to_inspect].astype(str).apply( 573 lambda s: s.str.match(iso_dt_regex).all() 574 ) 575 576 ### list of datetime column names 577 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 578 if not datetime_cols: 579 if debug: 580 dprint("No columns detected as datetimes, returning...") 581 return df.infer_objects(copy=False).fillna(pandas.NA) 582 583 if debug: 584 dprint("Converting columns to datetimes: " + str(datetime_cols)) 585 586 def _parse_to_datetime(x): 587 return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc) 588 589 try: 590 if not using_dask: 591 df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime) 592 else: 593 df[datetime_cols] = df[datetime_cols].apply( 594 _parse_to_datetime, 595 utc=True, 596 axis=1, 597 meta={ 598 col: MRSM_PD_DTYPES['datetime'] 599 for col in datetime_cols 600 } 601 ) 602 except Exception: 603 warn( 604 f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n" 605 + f"{traceback.format_exc()}" 606 ) 607 608 if strip_timezone: 609 for dt in datetime_cols: 610 try: 611 df[dt] = df[dt].dt.tz_localize(None) 612 except Exception: 613 warn( 614 f"Unable to convert column '{dt}' to naive datetime:\n" 615 + f"{traceback.format_exc()}" 616 ) 617 618 return df.fillna(pandas.NA) 619 620 621def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 622 """ 623 Get the columns which contain unhashable objects from a Pandas DataFrame. 624 625 Parameters 626 ---------- 627 df: pd.DataFrame 628 The DataFrame which may contain unhashable objects. 629 630 Returns 631 ------- 632 A list of columns. 633 """ 634 if df is None: 635 return [] 636 if len(df) == 0: 637 return [] 638 639 is_dask = 'dask' in df.__module__ 640 if is_dask: 641 from meerschaum.utils.packages import attempt_import 642 pandas = attempt_import('pandas') 643 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 644 return [ 645 col for col, val in df.iloc[0].items() 646 if not isinstance(val, Hashable) 647 ] 648 649 650def get_json_cols(df: 'pd.DataFrame') -> List[str]: 651 """ 652 Get the columns which contain unhashable objects from a Pandas DataFrame. 653 654 Parameters 655 ---------- 656 df: pd.DataFrame 657 The DataFrame which may contain unhashable objects. 658 659 Returns 660 ------- 661 A list of columns to be encoded as JSON. 662 """ 663 if df is None: 664 return [] 665 666 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 667 if is_dask: 668 df = get_first_valid_dask_partition(df) 669 670 if len(df) == 0: 671 return [] 672 673 cols_indices = { 674 col: df[col].first_valid_index() 675 for col in df.columns 676 } 677 return [ 678 col 679 for col, ix in cols_indices.items() 680 if ( 681 ix is not None 682 and isinstance(df.loc[ix][col], (dict, list)) 683 ) 684 ] 685 686 687def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 688 """ 689 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 690 691 Parameters 692 ---------- 693 df: pd.DataFrame 694 The DataFrame which may contain decimal objects. 695 696 Returns 697 ------- 698 A list of columns to treat as numerics. 699 """ 700 if df is None: 701 return [] 702 from decimal import Decimal 703 is_dask = 'dask' in df.__module__ 704 if is_dask: 705 df = get_first_valid_dask_partition(df) 706 707 if len(df) == 0: 708 return [] 709 710 cols_indices = { 711 col: df[col].first_valid_index() 712 for col in df.columns 713 } 714 return [ 715 col 716 for col, ix in cols_indices.items() 717 if ( 718 ix is not None 719 and 720 isinstance(df.loc[ix][col], Decimal) 721 ) 722 ] 723 724 725def get_bool_cols(df: 'pd.DataFrame') -> List[str]: 726 """ 727 Get the columns which contain `bool` objects from a Pandas DataFrame. 728 729 Parameters 730 ---------- 731 df: pd.DataFrame 732 The DataFrame which may contain bools. 733 734 Returns 735 ------- 736 A list of columns to treat as bools. 737 """ 738 if df is None: 739 return [] 740 741 is_dask = 'dask' in df.__module__ 742 if is_dask: 743 df = get_first_valid_dask_partition(df) 744 745 if len(df) == 0: 746 return [] 747 748 from meerschaum.utils.dtypes import are_dtypes_equal 749 750 return [ 751 col 752 for col, typ in df.dtypes.items() 753 if are_dtypes_equal(str(typ), 'bool') 754 ] 755 756 757def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 758 """ 759 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 760 761 Parameters 762 ---------- 763 df: pd.DataFrame 764 The DataFrame which may contain UUID objects. 765 766 Returns 767 ------- 768 A list of columns to treat as UUIDs. 769 """ 770 if df is None: 771 return [] 772 from uuid import UUID 773 is_dask = 'dask' in df.__module__ 774 if is_dask: 775 df = get_first_valid_dask_partition(df) 776 777 if len(df) == 0: 778 return [] 779 780 cols_indices = { 781 col: df[col].first_valid_index() 782 for col in df.columns 783 } 784 return [ 785 col 786 for col, ix in cols_indices.items() 787 if ( 788 ix is not None 789 and 790 isinstance(df.loc[ix][col], UUID) 791 ) 792 ] 793 794 795def get_datetime_cols( 796 df: 'pd.DataFrame', 797 timezone_aware: bool = True, 798 timezone_naive: bool = True, 799 with_tz_precision: bool = False, 800) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]: 801 """ 802 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 803 804 Parameters 805 ---------- 806 df: pd.DataFrame 807 The DataFrame which may contain `datetime` or `Timestamp` objects. 808 809 timezone_aware: bool, default True 810 If `True`, include timezone-aware datetime columns. 811 812 timezone_naive: bool, default True 813 If `True`, include timezone-naive datetime columns. 814 815 with_tz_precision: bool, default False 816 If `True`, return a dictionary mapping column names to tuples in the form 817 `(timezone, precision)`. 818 819 Returns 820 ------- 821 A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples 822 (if `with_tz_precision` is `True`). 823 """ 824 if not timezone_aware and not timezone_naive: 825 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 826 827 if df is None: 828 return [] if not with_tz_precision else {} 829 830 from datetime import datetime 831 from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES 832 is_dask = 'dask' in df.__module__ 833 if is_dask: 834 df = get_first_valid_dask_partition(df) 835 836 def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]: 837 """ 838 Extract the tz + precision tuple from a dtype string. 839 """ 840 meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '') 841 tz = ( 842 None 843 if ',' not in meta_str 844 else meta_str.split(',', maxsplit=1)[-1] 845 ) 846 precision_abbreviation = ( 847 meta_str 848 if ',' not in meta_str 849 else meta_str.split(',')[0] 850 ) 851 precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation] 852 return tz, precision 853 854 def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]: 855 """ 856 Return the tz + precision tuple from a Python datetime object. 857 """ 858 return dt.tzname(), 'microsecond' 859 860 known_dt_cols_types = { 861 col: str(typ) 862 for col, typ in df.dtypes.items() 863 if are_dtypes_equal('datetime', str(typ)) 864 } 865 866 known_dt_cols_tuples = { 867 col: get_tz_precision_from_dtype(typ) 868 for col, typ in known_dt_cols_types.items() 869 } 870 871 if len(df) == 0: 872 return ( 873 list(known_dt_cols_types) 874 if not with_tz_precision 875 else known_dt_cols_tuples 876 ) 877 878 cols_indices = { 879 col: df[col].first_valid_index() 880 for col in df.columns 881 if col not in known_dt_cols_types 882 } 883 pydt_cols_tuples = { 884 col: get_tz_precision_from_datetime(sample_val) 885 for col, ix in cols_indices.items() 886 if ( 887 ix is not None 888 and 889 isinstance((sample_val := df.loc[ix][col]), datetime) 890 ) 891 } 892 893 dt_cols_tuples = { 894 **known_dt_cols_tuples, 895 **pydt_cols_tuples 896 } 897 898 all_dt_cols_tuples = { 899 col: dt_cols_tuples[col] 900 for col in df.columns 901 if col in dt_cols_tuples 902 } 903 if timezone_aware and timezone_naive: 904 return ( 905 list(all_dt_cols_tuples) 906 if not with_tz_precision 907 else all_dt_cols_tuples 908 ) 909 910 known_timezone_aware_dt_cols = [ 911 col 912 for col in known_dt_cols_types 913 if getattr(df[col], 'tz', None) is not None 914 ] 915 timezone_aware_pydt_cols_tuples = { 916 col: (tz, precision) 917 for col, (tz, precision) in pydt_cols_tuples.items() 918 if df.loc[cols_indices[col]][col].tzinfo is not None 919 } 920 timezone_aware_dt_cols_set = set( 921 known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples) 922 ) 923 timezone_aware_cols_tuples = { 924 col: (tz, precision) 925 for col, (tz, precision) in all_dt_cols_tuples.items() 926 if col in timezone_aware_dt_cols_set 927 } 928 timezone_naive_cols_tuples = { 929 col: (tz, precision) 930 for col, (tz, precision) in all_dt_cols_tuples.items() 931 if col not in timezone_aware_dt_cols_set 932 } 933 934 if timezone_aware: 935 return ( 936 list(timezone_aware_cols_tuples) 937 if not with_tz_precision 938 else timezone_aware_cols_tuples 939 ) 940 941 return ( 942 list(timezone_naive_cols_tuples) 943 if not with_tz_precision 944 else timezone_naive_cols_tuples 945 ) 946 947 948def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 949 """ 950 Return a dictionary mapping datetime columns to specific types strings. 951 952 Parameters 953 ---------- 954 df: pd.DataFrame 955 The DataFrame which may contain datetime columns. 956 957 Returns 958 ------- 959 A dictionary mapping the datetime columns' names to dtype strings 960 (containing timezone and precision metadata). 961 962 Examples 963 -------- 964 >>> from datetime import datetime, timezone 965 >>> import pandas as pd 966 >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]}) 967 >>> get_datetime_cols_types(df) 968 {'dt_tz_aware': 'datetime64[us, UTC]'} 969 >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]}) 970 >>> get_datetime_cols_types(df) 971 {'distant_dt': 'datetime64[us]'} 972 >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)}) 973 >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]') 974 >>> get_datetime_cols_types(df) 975 {'dt_second': 'datetime64[s]'} 976 """ 977 from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS 978 dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True) 979 if not dt_cols_tuples: 980 return {} 981 982 return { 983 col: ( 984 f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]" 985 if tz is None 986 else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]" 987 ) 988 for col, (tz, precision) in dt_cols_tuples.items() 989 } 990 991 992def get_date_cols(df: 'pd.DataFrame') -> List[str]: 993 """ 994 Get the `date` columns from a Pandas DataFrame. 995 996 Parameters 997 ---------- 998 df: pd.DataFrame 999 The DataFrame which may contain dates. 1000 1001 Returns 1002 ------- 1003 A list of columns to treat as dates. 1004 """ 1005 from meerschaum.utils.dtypes import are_dtypes_equal 1006 if df is None: 1007 return [] 1008 1009 is_dask = 'dask' in df.__module__ 1010 if is_dask: 1011 df = get_first_valid_dask_partition(df) 1012 1013 known_date_cols = [ 1014 col 1015 for col, typ in df.dtypes.items() 1016 if are_dtypes_equal(typ, 'date') 1017 ] 1018 1019 if len(df) == 0: 1020 return known_date_cols 1021 1022 cols_indices = { 1023 col: df[col].first_valid_index() 1024 for col in df.columns 1025 if col not in known_date_cols 1026 } 1027 object_date_cols = [ 1028 col 1029 for col, ix in cols_indices.items() 1030 if ( 1031 ix is not None 1032 and isinstance(df.loc[ix][col], date) 1033 ) 1034 ] 1035 1036 all_date_cols = set(known_date_cols + object_date_cols) 1037 1038 return [ 1039 col 1040 for col in df.columns 1041 if col in all_date_cols 1042 ] 1043 1044 1045def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 1046 """ 1047 Get the columns which contain bytes strings from a Pandas DataFrame. 1048 1049 Parameters 1050 ---------- 1051 df: pd.DataFrame 1052 The DataFrame which may contain bytes strings. 1053 1054 Returns 1055 ------- 1056 A list of columns to treat as bytes. 1057 """ 1058 if df is None: 1059 return [] 1060 1061 is_dask = 'dask' in df.__module__ 1062 if is_dask: 1063 df = get_first_valid_dask_partition(df) 1064 1065 known_bytes_cols = [ 1066 col 1067 for col, typ in df.dtypes.items() 1068 if str(typ) == 'binary[pyarrow]' 1069 ] 1070 1071 if len(df) == 0: 1072 return known_bytes_cols 1073 1074 cols_indices = { 1075 col: df[col].first_valid_index() 1076 for col in df.columns 1077 if col not in known_bytes_cols 1078 } 1079 object_bytes_cols = [ 1080 col 1081 for col, ix in cols_indices.items() 1082 if ( 1083 ix is not None 1084 and isinstance(df.loc[ix][col], bytes) 1085 ) 1086 ] 1087 1088 all_bytes_cols = set(known_bytes_cols + object_bytes_cols) 1089 1090 return [ 1091 col 1092 for col in df.columns 1093 if col in all_bytes_cols 1094 ] 1095 1096 1097def get_geometry_cols( 1098 df: 'pd.DataFrame', 1099 with_types_srids: bool = False, 1100) -> Union[List[str], Dict[str, Any]]: 1101 """ 1102 Get the columns which contain shapely objects from a Pandas DataFrame. 1103 1104 Parameters 1105 ---------- 1106 df: pd.DataFrame 1107 The DataFrame which may contain bytes strings. 1108 1109 with_types_srids: bool, default False 1110 If `True`, return a dictionary mapping columns to geometry types and SRIDs. 1111 1112 Returns 1113 ------- 1114 A list of columns to treat as `geometry`. 1115 If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID). 1116 """ 1117 if df is None: 1118 return [] if not with_types_srids else {} 1119 1120 is_dask = 'dask' in df.__module__ 1121 if is_dask: 1122 df = get_first_valid_dask_partition(df) 1123 1124 if len(df) == 0: 1125 return [] if not with_types_srids else {} 1126 1127 cols_indices = { 1128 col: df[col].first_valid_index() 1129 for col in df.columns 1130 } 1131 geo_cols = [ 1132 col 1133 for col, ix in cols_indices.items() 1134 if ( 1135 ix is not None 1136 and 1137 'shapely' in str(type(df.loc[ix][col])) 1138 ) 1139 ] 1140 if not with_types_srids: 1141 return geo_cols 1142 1143 gpd = mrsm.attempt_import('geopandas', lazy=False) 1144 geo_cols_types_srids = {} 1145 for col in geo_cols: 1146 try: 1147 sample_geo_series = gpd.GeoSeries(df[col], crs=None) 1148 geometry_types = { 1149 geom.geom_type 1150 for geom in sample_geo_series 1151 if hasattr(geom, 'geom_type') 1152 } 1153 geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series) 1154 srid = ( 1155 ( 1156 sample_geo_series.crs.sub_crs_list[0].to_epsg() 1157 if sample_geo_series.crs.is_compound 1158 else sample_geo_series.crs.to_epsg() 1159 ) 1160 if sample_geo_series.crs 1161 else 0 1162 ) 1163 geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry' 1164 if geometry_type != 'geometry' and geometry_has_z: 1165 geometry_type = geometry_type + 'Z' 1166 except Exception: 1167 srid = 0 1168 geometry_type = 'geometry' 1169 geo_cols_types_srids[col] = (geometry_type, srid) 1170 1171 return geo_cols_types_srids 1172 1173 1174def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 1175 """ 1176 Return a dtypes dictionary mapping columns to specific geometry types (type, srid). 1177 """ 1178 geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True) 1179 new_cols_types = {} 1180 for col, (geometry_type, srid) in geometry_cols_types_srids.items(): 1181 new_dtype = "geometry" 1182 modifier = "" 1183 if not srid and geometry_type.lower() == 'geometry': 1184 new_cols_types[col] = new_dtype 1185 continue 1186 1187 modifier = "[" 1188 if geometry_type.lower() != 'geometry': 1189 modifier += f"{geometry_type}" 1190 1191 if srid: 1192 if modifier != '[': 1193 modifier += ", " 1194 modifier += f"{srid}" 1195 modifier += "]" 1196 new_cols_types[col] = f"{new_dtype}{modifier}" 1197 return new_cols_types 1198 1199 1200def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]: 1201 """ 1202 Return a dtypes dictionary mapping special columns to their dtypes. 1203 """ 1204 return { 1205 **{col: 'json' for col in get_json_cols(df)}, 1206 **{col: 'uuid' for col in get_uuid_cols(df)}, 1207 **{col: 'bytes' for col in get_bytes_cols(df)}, 1208 **{col: 'bool' for col in get_bool_cols(df)}, 1209 **{col: 'numeric' for col in get_numeric_cols(df)}, 1210 **{col: 'date' for col in get_date_cols(df)}, 1211 **get_datetime_cols_types(df), 1212 **get_geometry_cols_types(df), 1213 } 1214 1215 1216def enforce_dtypes( 1217 df: 'pd.DataFrame', 1218 dtypes: Dict[str, str], 1219 explicit_dtypes: Optional[Dict[str, str]] = None, 1220 safe_copy: bool = True, 1221 coerce_numeric: bool = False, 1222 coerce_timezone: bool = True, 1223 strip_timezone: bool = False, 1224 debug: bool = False, 1225) -> 'pd.DataFrame': 1226 """ 1227 Enforce the `dtypes` dictionary on a DataFrame. 1228 1229 Parameters 1230 ---------- 1231 df: pd.DataFrame 1232 The DataFrame on which to enforce dtypes. 1233 1234 dtypes: Dict[str, str] 1235 The data types to attempt to enforce on the DataFrame. 1236 1237 explicit_dtypes: Optional[Dict[str, str]], default None 1238 If provided, automatic dtype coersion will respect explicitly configured 1239 dtypes (`int`, `float`, `numeric`). 1240 1241 safe_copy: bool, default True 1242 If `True`, create a copy before comparing and modifying the dataframes. 1243 Setting to `False` may mutate the DataFrames. 1244 See `meerschaum.utils.dataframe.filter_unseen_df`. 1245 1246 coerce_numeric: bool, default False 1247 If `True`, convert float and int collisions to numeric. 1248 1249 coerce_timezone: bool, default True 1250 If `True`, convert datetimes to UTC. 1251 1252 strip_timezone: bool, default False 1253 If `coerce_timezone` and `strip_timezone` are `True`, 1254 remove timezone information from datetimes. 1255 1256 debug: bool, default False 1257 Verbosity toggle. 1258 1259 Returns 1260 ------- 1261 The Pandas DataFrame with the types enforced. 1262 """ 1263 import json 1264 import functools 1265 from meerschaum.utils.debug import dprint 1266 from meerschaum.utils.formatting import pprint 1267 from meerschaum.utils.dtypes import ( 1268 are_dtypes_equal, 1269 to_pandas_dtype, 1270 is_dtype_numeric, 1271 attempt_cast_to_numeric, 1272 attempt_cast_to_uuid, 1273 attempt_cast_to_bytes, 1274 attempt_cast_to_geometry, 1275 coerce_timezone as _coerce_timezone, 1276 get_geometry_type_srid, 1277 ) 1278 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 1279 pandas = mrsm.attempt_import('pandas') 1280 is_dask = 'dask' in df.__module__ 1281 if safe_copy: 1282 df = df.copy() 1283 if len(df.columns) == 0: 1284 if debug: 1285 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 1286 return df 1287 1288 explicit_dtypes = explicit_dtypes or {} 1289 pipe_pandas_dtypes = { 1290 col: to_pandas_dtype(typ) 1291 for col, typ in dtypes.items() 1292 } 1293 json_cols = [ 1294 col 1295 for col, typ in dtypes.items() 1296 if typ == 'json' 1297 ] 1298 numeric_cols = [ 1299 col 1300 for col, typ in dtypes.items() 1301 if typ.startswith('numeric') 1302 ] 1303 geometry_cols_types_srids = { 1304 col: get_geometry_type_srid(typ, default_srid=0) 1305 for col, typ in dtypes.items() 1306 if typ.startswith('geometry') or typ.startswith('geography') 1307 } 1308 uuid_cols = [ 1309 col 1310 for col, typ in dtypes.items() 1311 if typ == 'uuid' 1312 ] 1313 bytes_cols = [ 1314 col 1315 for col, typ in dtypes.items() 1316 if typ == 'bytes' 1317 ] 1318 datetime_cols = [ 1319 col 1320 for col, typ in dtypes.items() 1321 if are_dtypes_equal(typ, 'datetime') 1322 ] 1323 df_numeric_cols = get_numeric_cols(df) 1324 if debug: 1325 dprint("Desired data types:") 1326 pprint(dtypes) 1327 dprint("Data types for incoming DataFrame:") 1328 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 1329 1330 if json_cols and len(df) > 0: 1331 if debug: 1332 dprint(f"Checking columns for JSON encoding: {json_cols}") 1333 for col in json_cols: 1334 if col in df.columns: 1335 try: 1336 df[col] = df[col].apply( 1337 ( 1338 lambda x: ( 1339 json.loads(x) 1340 if isinstance(x, str) 1341 else x 1342 ) 1343 ) 1344 ) 1345 except Exception as e: 1346 if debug: 1347 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 1348 1349 if numeric_cols: 1350 if debug: 1351 dprint(f"Checking for numerics: {numeric_cols}") 1352 for col in numeric_cols: 1353 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 1354 if col in df.columns: 1355 try: 1356 df[col] = df[col].apply( 1357 functools.partial( 1358 attempt_cast_to_numeric, 1359 quantize=True, 1360 precision=precision, 1361 scale=scale, 1362 ) 1363 ) 1364 except Exception as e: 1365 if debug: 1366 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 1367 1368 if uuid_cols: 1369 if debug: 1370 dprint(f"Checking for UUIDs: {uuid_cols}") 1371 for col in uuid_cols: 1372 if col in df.columns: 1373 try: 1374 df[col] = df[col].apply(attempt_cast_to_uuid) 1375 except Exception as e: 1376 if debug: 1377 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1378 1379 if bytes_cols: 1380 if debug: 1381 dprint(f"Checking for bytes: {bytes_cols}") 1382 for col in bytes_cols: 1383 if col in df.columns: 1384 try: 1385 df[col] = df[col].apply(attempt_cast_to_bytes) 1386 except Exception as e: 1387 if debug: 1388 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1389 1390 if datetime_cols and coerce_timezone: 1391 if debug: 1392 dprint(f"Checking for datetime conversion: {datetime_cols}") 1393 for col in datetime_cols: 1394 if col in df.columns: 1395 if not strip_timezone and 'utc' in str(df.dtypes[col]).lower(): 1396 if debug: 1397 dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).") 1398 continue 1399 if strip_timezone and ',' not in str(df.dtypes[col]): 1400 if debug: 1401 dprint( 1402 f"Skip UTC coersion (stripped) for column '{col}' " 1403 f"({str(df[col].dtype)})." 1404 ) 1405 continue 1406 1407 if debug: 1408 dprint( 1409 f"Data type for column '{col}' before timezone coersion: " 1410 f"{str(df[col].dtype)}" 1411 ) 1412 1413 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1414 if debug: 1415 dprint( 1416 f"Data type for column '{col}' after timezone coersion: " 1417 f"{str(df[col].dtype)}" 1418 ) 1419 1420 if geometry_cols_types_srids: 1421 geopandas = mrsm.attempt_import('geopandas') 1422 if debug: 1423 dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}") 1424 parsed_geom_cols = [] 1425 for col in geometry_cols_types_srids: 1426 try: 1427 df[col] = df[col].apply(attempt_cast_to_geometry) 1428 parsed_geom_cols.append(col) 1429 except Exception as e: 1430 if debug: 1431 dprint(f"Unable to parse column '{col}' as geometry:\n{e}") 1432 1433 if parsed_geom_cols: 1434 if debug: 1435 dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...") 1436 try: 1437 _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]] 1438 df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid) 1439 for col, (_, srid) in geometry_cols_types_srids.items(): 1440 if srid: 1441 if debug: 1442 dprint(f"Setting '{col}' to SRID '{srid}'...") 1443 _ = df[col].set_crs(srid) 1444 if parsed_geom_cols[0] not in df.columns: 1445 df.rename_geometry(parsed_geom_cols[0], inplace=True) 1446 except (ValueError, TypeError): 1447 if debug: 1448 import traceback 1449 dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}") 1450 1451 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1452 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1453 if debug: 1454 dprint("Data types match. Exiting enforcement...") 1455 return df 1456 1457 common_dtypes = {} 1458 common_diff_dtypes = {} 1459 for col, typ in pipe_pandas_dtypes.items(): 1460 if col in df_dtypes: 1461 common_dtypes[col] = typ 1462 if not are_dtypes_equal(typ, df_dtypes[col]): 1463 common_diff_dtypes[col] = df_dtypes[col] 1464 1465 if debug: 1466 dprint("Common columns with different dtypes:") 1467 pprint(common_diff_dtypes) 1468 1469 detected_dt_cols = {} 1470 for col, typ in common_diff_dtypes.items(): 1471 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1472 df_dtypes[col] = typ 1473 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1474 for col in detected_dt_cols: 1475 del common_diff_dtypes[col] 1476 1477 if debug: 1478 dprint("Common columns with different dtypes (after dates):") 1479 pprint(common_diff_dtypes) 1480 1481 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1482 if debug: 1483 dprint( 1484 "The incoming DataFrame has mostly the same types, skipping enforcement." 1485 + "The only detected difference was in the following datetime columns." 1486 ) 1487 pprint(detected_dt_cols) 1488 return df 1489 1490 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1491 previous_typ = common_dtypes[col] 1492 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1493 explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float') 1494 explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int') 1495 explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric') 1496 all_nan = ( 1497 df[col].isnull().all() 1498 if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int) 1499 else None 1500 ) 1501 cast_to_numeric = explicitly_numeric or ( 1502 ( 1503 col in df_numeric_cols 1504 or ( 1505 mixed_numeric_types 1506 and not (explicitly_float or explicitly_int) 1507 and not all_nan 1508 and coerce_numeric 1509 ) 1510 ) 1511 ) 1512 1513 if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types): 1514 from meerschaum.utils.formatting import make_header 1515 msg = ( 1516 make_header(f"Coercing column '{col}' to numeric:", left_pad=0) 1517 + "\n" 1518 + f" Previous type: {previous_typ}\n" 1519 + f" Current type: {typ if col not in df_numeric_cols else 'Decimal'}" 1520 + ("\n Column is explicitly numeric." if explicitly_numeric else "") 1521 ) if cast_to_numeric else ( 1522 f"Will not coerce column '{col}' to numeric.\n" 1523 f" Numeric columns in dataframe: {df_numeric_cols}\n" 1524 f" Mixed numeric types: {mixed_numeric_types}\n" 1525 f" Explicitly float: {explicitly_float}\n" 1526 f" Explicitly int: {explicitly_int}\n" 1527 f" All NaN: {all_nan}\n" 1528 f" Coerce numeric: {coerce_numeric}" 1529 ) 1530 dprint(msg) 1531 1532 if cast_to_numeric: 1533 common_dtypes[col] = attempt_cast_to_numeric 1534 common_diff_dtypes[col] = attempt_cast_to_numeric 1535 1536 for d in common_diff_dtypes: 1537 t = common_dtypes[d] 1538 if debug: 1539 dprint(f"Casting column {d} to dtype {t}.") 1540 try: 1541 df[d] = ( 1542 df[d].apply(t) 1543 if callable(t) 1544 else df[d].astype(t) 1545 ) 1546 except Exception as e: 1547 if debug: 1548 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}") 1549 if 'int' in str(t).lower(): 1550 try: 1551 df[d] = df[d].astype('float64').astype(t) 1552 except Exception: 1553 if debug: 1554 dprint(f"Was unable to convert to float then {t}.") 1555 return df 1556 1557 1558def get_datetime_bound_from_df( 1559 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1560 datetime_column: str, 1561 minimum: bool = True, 1562) -> Union[int, datetime, None]: 1563 """ 1564 Return the minimum or maximum datetime (or integer) from a DataFrame. 1565 1566 Parameters 1567 ---------- 1568 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1569 The DataFrame, list, or dict which contains the range axis. 1570 1571 datetime_column: str 1572 The name of the datetime (or int) column. 1573 1574 minimum: bool 1575 Whether to return the minimum (default) or maximum value. 1576 1577 Returns 1578 ------- 1579 The minimum or maximum datetime value in the dataframe, or `None`. 1580 """ 1581 from meerschaum.utils.dtypes import to_datetime, value_is_null 1582 1583 if df is None: 1584 return None 1585 if not datetime_column: 1586 return None 1587 1588 def compare(a, b): 1589 if a is None: 1590 return b 1591 if b is None: 1592 return a 1593 if minimum: 1594 return a if a < b else b 1595 return a if a > b else b 1596 1597 if isinstance(df, list): 1598 if len(df) == 0: 1599 return None 1600 best_yet = df[0].get(datetime_column, None) 1601 for doc in df: 1602 val = doc.get(datetime_column, None) 1603 best_yet = compare(best_yet, val) 1604 return best_yet 1605 1606 if isinstance(df, dict): 1607 if datetime_column not in df: 1608 return None 1609 best_yet = df[datetime_column][0] 1610 for val in df[datetime_column]: 1611 best_yet = compare(best_yet, val) 1612 return best_yet 1613 1614 if 'DataFrame' in str(type(df)): 1615 from meerschaum.utils.dtypes import are_dtypes_equal 1616 pandas = mrsm.attempt_import('pandas') 1617 is_dask = 'dask' in df.__module__ 1618 1619 if datetime_column not in df.columns: 1620 return None 1621 1622 try: 1623 dt_val = ( 1624 df[datetime_column].min(skipna=True) 1625 if minimum 1626 else df[datetime_column].max(skipna=True) 1627 ) 1628 except Exception: 1629 dt_val = pandas.NA 1630 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1631 dt_val = dt_val.compute() 1632 1633 return ( 1634 to_datetime(dt_val, as_pydatetime=True) 1635 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1636 else (dt_val if not value_is_null(dt_val) else None) 1637 ) 1638 1639 return None 1640 1641 1642def get_unique_index_values( 1643 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1644 indices: List[str], 1645) -> Dict[str, List[Any]]: 1646 """ 1647 Return a dictionary of the unique index values in a DataFrame. 1648 1649 Parameters 1650 ---------- 1651 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1652 The dataframe (or list or dict) which contains index values. 1653 1654 indices: List[str] 1655 The list of index columns. 1656 1657 Returns 1658 ------- 1659 A dictionary mapping indices to unique values. 1660 """ 1661 if df is None: 1662 return {} 1663 if 'dataframe' in str(type(df)).lower(): 1664 pandas = mrsm.attempt_import('pandas') 1665 return { 1666 col: list({ 1667 (val if val is not pandas.NA else None) 1668 for val in df[col].unique() 1669 }) 1670 for col in indices 1671 if col in df.columns 1672 } 1673 1674 unique_indices = defaultdict(lambda: set()) 1675 if isinstance(df, list): 1676 for doc in df: 1677 for index in indices: 1678 if index in doc: 1679 unique_indices[index].add(doc[index]) 1680 1681 elif isinstance(df, dict): 1682 for index in indices: 1683 if index in df: 1684 unique_indices[index] = unique_indices[index].union(set(df[index])) 1685 1686 return {key: list(val) for key, val in unique_indices.items()} 1687 1688 1689def df_is_chunk_generator(df: Any) -> bool: 1690 """ 1691 Determine whether to treat `df` as a chunk generator. 1692 1693 Note this should only be used in a context where generators are expected, 1694 as it will return `True` for any iterable. 1695 1696 Parameters 1697 ---------- 1698 The DataFrame or chunk generator to evaluate. 1699 1700 Returns 1701 ------- 1702 A `bool` indicating whether to treat `df` as a generator. 1703 """ 1704 return ( 1705 not isinstance(df, (dict, list, str)) 1706 and 'DataFrame' not in str(type(df)) 1707 and isinstance(df, (Generator, Iterable, Iterator)) 1708 ) 1709 1710 1711def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1712 """ 1713 Return the Dask `npartitions` value for a given `chunksize`. 1714 """ 1715 if chunksize == -1: 1716 from meerschaum.config import get_config 1717 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1718 if chunksize is None: 1719 return 1 1720 return -1 * chunksize 1721 1722 1723def df_from_literal( 1724 pipe: Optional[mrsm.Pipe] = None, 1725 literal: Optional[str] = None, 1726 debug: bool = False 1727) -> 'pd.DataFrame': 1728 """ 1729 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1730 1731 Parameters 1732 ---------- 1733 pipe: Optional['meerschaum.Pipe'], default None 1734 The pipe which will consume the literal value. 1735 1736 Returns 1737 ------- 1738 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1739 and the literal as the value. 1740 """ 1741 from meerschaum.utils.packages import import_pandas 1742 from meerschaum.utils.warnings import error, warn 1743 from meerschaum.utils.debug import dprint 1744 from meerschaum.utils.dtypes import get_current_timestamp 1745 1746 if pipe is None or literal is None: 1747 error("Please provide a Pipe and a literal value") 1748 1749 dt_col = pipe.columns.get( 1750 'datetime', 1751 mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing') 1752 ) 1753 val_col = pipe.get_val_column(debug=debug) 1754 1755 val = literal 1756 if isinstance(literal, str): 1757 if debug: 1758 dprint(f"Received literal string: '{literal}'") 1759 import ast 1760 try: 1761 val = ast.literal_eval(literal) 1762 except Exception: 1763 warn( 1764 "Failed to parse value from string:\n" + f"{literal}" + 1765 "\n\nWill cast as a string instead."\ 1766 ) 1767 val = literal 1768 1769 now = get_current_timestamp(pipe.precision) 1770 pd = import_pandas() 1771 return pd.DataFrame({dt_col: [now], val_col: [val]}) 1772 1773 1774def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1775 """ 1776 Return the first valid Dask DataFrame partition (if possible). 1777 """ 1778 pdf = None 1779 for partition in ddf.partitions: 1780 try: 1781 pdf = partition.compute() 1782 except Exception: 1783 continue 1784 if len(pdf) > 0: 1785 return pdf 1786 _ = mrsm.attempt_import('partd', lazy=False) 1787 return ddf.compute() 1788 1789 1790def query_df( 1791 df: 'pd.DataFrame', 1792 params: Optional[Dict[str, Any]] = None, 1793 begin: Union[datetime, int, None] = None, 1794 end: Union[datetime, int, None] = None, 1795 datetime_column: Optional[str] = None, 1796 select_columns: Optional[List[str]] = None, 1797 omit_columns: Optional[List[str]] = None, 1798 inplace: bool = False, 1799 reset_index: bool = False, 1800 coerce_types: bool = False, 1801 debug: bool = False, 1802) -> 'pd.DataFrame': 1803 """ 1804 Query the dataframe with the params dictionary. 1805 1806 Parameters 1807 ---------- 1808 df: pd.DataFrame 1809 The DataFrame to query against. 1810 1811 params: Optional[Dict[str, Any]], default None 1812 The parameters dictionary to use for the query. 1813 1814 begin: Union[datetime, int, None], default None 1815 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1816 greater than or equal to this value. 1817 1818 end: Union[datetime, int, None], default None 1819 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1820 less than this value. 1821 1822 datetime_column: Optional[str], default None 1823 A `datetime_column` must be provided to use `begin` and `end`. 1824 1825 select_columns: Optional[List[str]], default None 1826 If provided, only return these columns. 1827 1828 omit_columns: Optional[List[str]], default None 1829 If provided, do not include these columns in the result. 1830 1831 inplace: bool, default False 1832 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1833 1834 reset_index: bool, default False 1835 If `True`, reset the index in the resulting DataFrame. 1836 1837 coerce_types: bool, default False 1838 If `True`, cast the dataframe and parameters as strings before querying. 1839 1840 Returns 1841 ------- 1842 A Pandas DataFrame query result. 1843 """ 1844 1845 def _process_select_columns(_df): 1846 if not select_columns: 1847 return 1848 for col in list(_df.columns): 1849 if col not in select_columns: 1850 del _df[col] 1851 1852 def _process_omit_columns(_df): 1853 if not omit_columns: 1854 return 1855 for col in list(_df.columns): 1856 if col in omit_columns: 1857 del _df[col] 1858 1859 if not params and not begin and not end: 1860 if not inplace: 1861 df = df.copy() 1862 _process_select_columns(df) 1863 _process_omit_columns(df) 1864 return df 1865 1866 from meerschaum.utils.debug import dprint 1867 from meerschaum.utils.misc import get_in_ex_params 1868 from meerschaum.utils.warnings import warn 1869 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1870 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1871 pandas = mrsm.attempt_import('pandas') 1872 NA = pandas.NA 1873 1874 if params: 1875 proto_in_ex_params = get_in_ex_params(params) 1876 for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items(): 1877 if proto_ex_vals: 1878 coerce_types = True 1879 break 1880 params = params.copy() 1881 for key, val in {k: v for k, v in params.items()}.items(): 1882 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'): 1883 if None in val: 1884 val = [item for item in val if item is not None] + [NA] 1885 params[key] = val 1886 if coerce_types: 1887 params[key] = [str(x) for x in val] 1888 else: 1889 if value_is_null(val): 1890 val = NA 1891 params[key] = NA 1892 if coerce_types: 1893 params[key] = str(val) 1894 1895 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1896 1897 if inplace: 1898 df.fillna(NA, inplace=True) 1899 else: 1900 df = df.infer_objects().fillna(NA) 1901 1902 if isinstance(begin, str): 1903 begin = dateutil_parser.parse(begin) 1904 if isinstance(end, str): 1905 end = dateutil_parser.parse(end) 1906 1907 if begin is not None or end is not None: 1908 if not datetime_column or datetime_column not in df.columns: 1909 warn( 1910 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1911 + "ignoring begin and end...", 1912 ) 1913 begin, end = None, None 1914 1915 if debug: 1916 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1917 1918 if datetime_column and (begin is not None or end is not None): 1919 if debug: 1920 dprint("Checking for datetime column compatability.") 1921 1922 from meerschaum.utils.dtypes import coerce_timezone 1923 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1924 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1925 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1926 1927 if df_is_dt: 1928 df_tz = ( 1929 getattr(df[datetime_column].dt, 'tz', None) 1930 if hasattr(df[datetime_column], 'dt') 1931 else None 1932 ) 1933 1934 if begin_is_int: 1935 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1936 if debug: 1937 dprint(f"`begin` will be cast to '{begin}'.") 1938 if end_is_int: 1939 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1940 if debug: 1941 dprint(f"`end` will be cast to '{end}'.") 1942 1943 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1944 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1945 1946 in_ex_params = get_in_ex_params(params) 1947 1948 masks = [ 1949 ( 1950 (df[datetime_column] >= begin) 1951 if begin is not None and datetime_column 1952 else True 1953 ) & ( 1954 (df[datetime_column] < end) 1955 if end is not None and datetime_column 1956 else True 1957 ) 1958 ] 1959 1960 masks.extend([ 1961 ( 1962 ( 1963 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1964 if in_vals 1965 else True 1966 ) & ( 1967 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1968 if ex_vals 1969 else True 1970 ) 1971 ) 1972 for col, (in_vals, ex_vals) in in_ex_params.items() 1973 if col in df.columns 1974 ]) 1975 query_mask = masks[0] 1976 for mask in masks[1:]: 1977 query_mask = query_mask & mask 1978 1979 original_cols = df.columns 1980 1981 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1982 ### to allow for `<NA>` values. 1983 bool_cols = [ 1984 col 1985 for col, typ in df.dtypes.items() 1986 if are_dtypes_equal(str(typ), 'bool') 1987 ] 1988 for col in bool_cols: 1989 df[col] = df[col].astype('boolean[pyarrow]') 1990 1991 if not isinstance(query_mask, bool): 1992 df['__mrsm_mask'] = ( 1993 query_mask.astype('boolean[pyarrow]') 1994 if hasattr(query_mask, 'astype') 1995 else query_mask 1996 ) 1997 1998 if inplace: 1999 df.where(query_mask, other=NA, inplace=True) 2000 df.dropna(how='all', inplace=True) 2001 result_df = df 2002 else: 2003 result_df = df.where(query_mask, other=NA) 2004 result_df.dropna(how='all', inplace=True) 2005 2006 else: 2007 result_df = df 2008 2009 if '__mrsm_mask' in df.columns: 2010 del df['__mrsm_mask'] 2011 if '__mrsm_mask' in result_df.columns: 2012 del result_df['__mrsm_mask'] 2013 2014 if reset_index: 2015 result_df.reset_index(drop=True, inplace=True) 2016 2017 result_df = enforce_dtypes( 2018 result_df, 2019 dtypes, 2020 safe_copy=False, 2021 debug=debug, 2022 coerce_numeric=False, 2023 coerce_timezone=False, 2024 ) 2025 2026 if select_columns == ['*']: 2027 select_columns = None 2028 2029 if not select_columns and not omit_columns: 2030 return result_df[original_cols] 2031 2032 _process_select_columns(result_df) 2033 _process_omit_columns(result_df) 2034 2035 return result_df 2036 2037 2038def to_json( 2039 df: 'pd.DataFrame', 2040 safe_copy: bool = True, 2041 orient: str = 'records', 2042 date_format: str = 'iso', 2043 date_unit: str = 'us', 2044 double_precision: int = 15, 2045 geometry_format: str = 'geojson', 2046 **kwargs: Any 2047) -> str: 2048 """ 2049 Serialize the given dataframe as a JSON string. 2050 2051 Parameters 2052 ---------- 2053 df: pd.DataFrame 2054 The DataFrame to be serialized. 2055 2056 safe_copy: bool, default True 2057 If `False`, modify the DataFrame inplace. 2058 2059 date_format: str, default 'iso' 2060 The default format for timestamps. 2061 2062 date_unit: str, default 'us' 2063 The precision of the timestamps. 2064 2065 double_precision: int, default 15 2066 The number of decimal places to use when encoding floating point values (maximum 15). 2067 2068 geometry_format: str, default 'geojson' 2069 The serialization format for geometry data. 2070 Accepted values are `geojson`, `wkb_hex`, and `wkt`. 2071 2072 Returns 2073 ------- 2074 A JSON string. 2075 """ 2076 import warnings 2077 import functools 2078 from meerschaum.utils.packages import import_pandas 2079 from meerschaum.utils.dtypes import ( 2080 serialize_bytes, 2081 serialize_decimal, 2082 serialize_geometry, 2083 ) 2084 pd = import_pandas() 2085 uuid_cols = get_uuid_cols(df) 2086 bytes_cols = get_bytes_cols(df) 2087 numeric_cols = get_numeric_cols(df) 2088 geometry_cols = get_geometry_cols(df) 2089 geometry_cols_srids = { 2090 col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0') 2091 for col in geometry_cols 2092 } if 'geodataframe' in str(type(df)).lower() else {} 2093 if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols): 2094 df = df.copy() 2095 for col in uuid_cols: 2096 df[col] = df[col].astype(str) 2097 for col in bytes_cols: 2098 df[col] = df[col].apply(serialize_bytes) 2099 for col in numeric_cols: 2100 df[col] = df[col].apply(serialize_decimal) 2101 with warnings.catch_warnings(): 2102 warnings.simplefilter("ignore") 2103 for col in geometry_cols: 2104 srid = geometry_cols_srids.get(col, None) or None 2105 df[col] = df[col].apply( 2106 functools.partial( 2107 serialize_geometry, 2108 geometry_format=geometry_format, 2109 srid=srid, 2110 ) 2111 ) 2112 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 2113 date_format=date_format, 2114 date_unit=date_unit, 2115 double_precision=double_precision, 2116 orient=orient, 2117 **kwargs 2118 ) 2119 2120 2121def to_simple_lines(df: 'pd.DataFrame') -> str: 2122 """ 2123 Serialize a Pandas Dataframe as lines of simple dictionaries. 2124 2125 Parameters 2126 ---------- 2127 df: pd.DataFrame 2128 The dataframe to serialize into simple lines text. 2129 2130 Returns 2131 ------- 2132 A string of simple line dictionaries joined by newlines. 2133 """ 2134 from meerschaum.utils.misc import to_simple_dict 2135 if df is None or len(df) == 0: 2136 return '' 2137 2138 docs = df.to_dict(orient='records') 2139 return '\n'.join(to_simple_dict(doc) for doc in docs) 2140 2141 2142def parse_simple_lines(data: str) -> 'pd.DataFrame': 2143 """ 2144 Parse simple lines text into a DataFrame. 2145 2146 Parameters 2147 ---------- 2148 data: str 2149 The simple lines text to parse into a DataFrame. 2150 2151 Returns 2152 ------- 2153 A dataframe containing the rows serialized in `data`. 2154 """ 2155 from meerschaum.utils.misc import string_to_dict 2156 from meerschaum.utils.packages import import_pandas 2157 pd = import_pandas() 2158 lines = data.splitlines() 2159 try: 2160 docs = [string_to_dict(line) for line in lines] 2161 df = pd.DataFrame(docs) 2162 except Exception: 2163 df = None 2164 2165 if df is None: 2166 raise ValueError("Cannot parse simple lines into a dataframe.") 2167 2168 return df
25def add_missing_cols_to_df( 26 df: 'pd.DataFrame', 27 dtypes: Dict[str, Any], 28) -> 'pd.DataFrame': 29 """ 30 Add columns from the dtypes dictionary as null columns to a new DataFrame. 31 32 Parameters 33 ---------- 34 df: pd.DataFrame 35 The dataframe we should copy and add null columns. 36 37 dtypes: 38 The data types dictionary which may contain keys not present in `df.columns`. 39 40 Returns 41 ------- 42 A new `DataFrame` with the keys from `dtypes` added as null columns. 43 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 44 NOTE: This will not ensure that dtypes are enforced! 45 46 Examples 47 -------- 48 >>> import pandas as pd 49 >>> df = pd.DataFrame([{'a': 1}]) 50 >>> dtypes = {'b': 'Int64'} 51 >>> add_missing_cols_to_df(df, dtypes) 52 a b 53 0 1 <NA> 54 >>> add_missing_cols_to_df(df, dtypes).dtypes 55 a int64 56 b Int64 57 dtype: object 58 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 59 a int64 60 dtype: object 61 >>> 62 """ 63 if set(df.columns) == set(dtypes): 64 return df 65 66 from meerschaum.utils.packages import attempt_import 67 from meerschaum.utils.dtypes import to_pandas_dtype 68 pandas = attempt_import('pandas') 69 70 def build_series(dtype: str): 71 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 72 73 assign_kwargs = { 74 str(col): build_series(str(typ)) 75 for col, typ in dtypes.items() 76 if col not in df.columns 77 } 78 df_with_cols = df.assign(**assign_kwargs) 79 for col in assign_kwargs: 80 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 81 return df_with_cols
Add columns from the dtypes dictionary as null columns to a new DataFrame.
Parameters
- df (pd.DataFrame): The dataframe we should copy and add null columns.
- dtypes:: The data types dictionary which may contain keys not present in
df.columns
.
Returns
- A new
DataFrame
with the keys fromdtypes
added as null columns. - If
df.dtypes
is the same asdtypes
, then return a reference todf
. - NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
a b
0 1 <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a int64
b Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a int64
dtype: object
>>>
84def filter_unseen_df( 85 old_df: 'pd.DataFrame', 86 new_df: 'pd.DataFrame', 87 safe_copy: bool = True, 88 dtypes: Optional[Dict[str, Any]] = None, 89 include_unchanged_columns: bool = False, 90 coerce_mixed_numerics: bool = True, 91 debug: bool = False, 92) -> 'pd.DataFrame': 93 """ 94 Left join two DataFrames to find the newest unseen data. 95 96 Parameters 97 ---------- 98 old_df: 'pd.DataFrame' 99 The original (target) dataframe. Acts as a filter on the `new_df`. 100 101 new_df: 'pd.DataFrame' 102 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 103 104 safe_copy: bool, default True 105 If `True`, create a copy before comparing and modifying the dataframes. 106 Setting to `False` may mutate the DataFrames. 107 108 dtypes: Optional[Dict[str, Any]], default None 109 Optionally specify the datatypes of the dataframe. 110 111 include_unchanged_columns: bool, default False 112 If `True`, include columns which haven't changed on rows which have changed. 113 114 coerce_mixed_numerics: bool, default True 115 If `True`, cast mixed integer and float columns between the old and new dataframes into 116 numeric values (`decimal.Decimal`). 117 118 debug: bool, default False 119 Verbosity toggle. 120 121 Returns 122 ------- 123 A pandas dataframe of the new, unseen rows in `new_df`. 124 125 Examples 126 -------- 127 ```python 128 >>> import pandas as pd 129 >>> df1 = pd.DataFrame({'a': [1,2]}) 130 >>> df2 = pd.DataFrame({'a': [2,3]}) 131 >>> filter_unseen_df(df1, df2) 132 a 133 0 3 134 135 ``` 136 137 """ 138 if old_df is None: 139 return new_df 140 141 if safe_copy: 142 old_df = old_df.copy() 143 new_df = new_df.copy() 144 145 import json 146 import functools 147 import traceback 148 from meerschaum.utils.warnings import warn 149 from meerschaum.utils.packages import import_pandas, attempt_import 150 from meerschaum.utils.dtypes import ( 151 to_pandas_dtype, 152 are_dtypes_equal, 153 attempt_cast_to_numeric, 154 attempt_cast_to_uuid, 155 attempt_cast_to_bytes, 156 attempt_cast_to_geometry, 157 coerce_timezone, 158 serialize_decimal, 159 ) 160 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 161 pd = import_pandas(debug=debug) 162 is_dask = 'dask' in new_df.__module__ 163 if is_dask: 164 pandas = attempt_import('pandas') 165 _ = attempt_import('partd', lazy=False) 166 dd = attempt_import('dask.dataframe') 167 merge = dd.merge 168 NA = pandas.NA 169 else: 170 merge = pd.merge 171 NA = pd.NA 172 173 new_df_dtypes = dict(new_df.dtypes) 174 old_df_dtypes = dict(old_df.dtypes) 175 176 same_cols = set(new_df.columns) == set(old_df.columns) 177 if not same_cols: 178 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 179 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 180 181 new_types_missing_from_old = { 182 col: typ 183 for col, typ in new_df_dtypes.items() 184 if col not in old_df_dtypes 185 } 186 old_types_missing_from_new = { 187 col: typ 188 for col, typ in new_df_dtypes.items() 189 if col not in old_df_dtypes 190 } 191 old_df_dtypes.update(new_types_missing_from_old) 192 new_df_dtypes.update(old_types_missing_from_new) 193 194 ### Edge case: two empty lists cast to DFs. 195 elif len(new_df.columns) == 0: 196 return new_df 197 198 try: 199 ### Order matters when checking equality. 200 new_df = new_df[old_df.columns] 201 202 except Exception as e: 203 warn( 204 "Was not able to cast old columns onto new DataFrame. " + 205 f"Are both DataFrames the same shape? Error:\n{e}", 206 stacklevel=3, 207 ) 208 return new_df[list(new_df_dtypes.keys())] 209 210 ### assume the old_df knows what it's doing, even if it's technically wrong. 211 if dtypes is None: 212 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 213 214 dtypes = { 215 col: to_pandas_dtype(typ) 216 for col, typ in dtypes.items() 217 if col in new_df_dtypes and col in old_df_dtypes 218 } 219 for col, typ in new_df_dtypes.items(): 220 if col not in dtypes: 221 dtypes[col] = typ 222 223 numeric_cols_precisions_scales = { 224 col: get_numeric_precision_scale(None, typ) 225 for col, typ in dtypes.items() 226 if col and str(typ).lower().startswith('numeric') 227 } 228 229 dt_dtypes = { 230 col: typ 231 for col, typ in dtypes.items() 232 if are_dtypes_equal(typ, 'datetime') 233 } 234 non_dt_dtypes = { 235 col: typ 236 for col, typ in dtypes.items() 237 if col not in dt_dtypes 238 } 239 240 cast_non_dt_cols = True 241 try: 242 new_df = new_df.astype(non_dt_dtypes) 243 cast_non_dt_cols = False 244 except Exception as e: 245 warn( 246 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 247 ) 248 249 cast_dt_cols = True 250 try: 251 for col, typ in dt_dtypes.items(): 252 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 253 strip_utc = ( 254 _dtypes_col_dtype.startswith('datetime64') 255 and 'utc' not in _dtypes_col_dtype.lower() 256 ) 257 if col in old_df.columns: 258 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 259 if col in new_df.columns: 260 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 261 cast_dt_cols = False 262 except Exception as e: 263 warn(f"Could not cast datetime columns:\n{e}") 264 265 cast_cols = cast_dt_cols or cast_non_dt_cols 266 267 new_numeric_cols_existing = get_numeric_cols(new_df) 268 old_numeric_cols = get_numeric_cols(old_df) 269 for col, typ in {k: v for k, v in dtypes.items()}.items(): 270 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 271 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 272 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 273 new_is_numeric = col in new_numeric_cols_existing 274 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 275 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 276 old_is_numeric = col in old_numeric_cols 277 278 if ( 279 coerce_mixed_numerics 280 and 281 (new_is_float or new_is_int or new_is_numeric) 282 and 283 (old_is_float or old_is_int or old_is_numeric) 284 ): 285 dtypes[col] = attempt_cast_to_numeric 286 cast_cols = True 287 continue 288 289 ### Fallback to object if the types don't match. 290 warn( 291 f"Detected different types for '{col}' " 292 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 293 + "falling back to 'object'..." 294 ) 295 dtypes[col] = 'object' 296 cast_cols = True 297 298 if cast_cols: 299 for col, dtype in dtypes.items(): 300 if col in new_df.columns: 301 try: 302 new_df[col] = ( 303 new_df[col].astype(dtype) 304 if not callable(dtype) 305 else new_df[col].apply(dtype) 306 ) 307 except Exception as e: 308 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 309 310 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 311 new_json_cols = get_json_cols(new_df) 312 old_json_cols = get_json_cols(old_df) 313 json_cols = set(new_json_cols + old_json_cols) 314 for json_col in old_json_cols: 315 old_df[json_col] = old_df[json_col].apply(serializer) 316 for json_col in new_json_cols: 317 new_df[json_col] = new_df[json_col].apply(serializer) 318 319 new_numeric_cols = get_numeric_cols(new_df) 320 numeric_cols = set(new_numeric_cols + old_numeric_cols) 321 for numeric_col in old_numeric_cols: 322 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 323 for numeric_col in new_numeric_cols: 324 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 325 326 old_dt_cols = [ 327 col 328 for col, typ in old_df.dtypes.items() 329 if are_dtypes_equal(str(typ), 'datetime') 330 ] 331 for col in old_dt_cols: 332 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 333 strip_utc = ( 334 _dtypes_col_dtype.startswith('datetime64') 335 and 'utc' not in _dtypes_col_dtype.lower() 336 ) 337 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 338 339 new_dt_cols = [ 340 col 341 for col, typ in new_df.dtypes.items() 342 if are_dtypes_equal(str(typ), 'datetime') 343 ] 344 for col in new_dt_cols: 345 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 346 strip_utc = ( 347 _dtypes_col_dtype.startswith('datetime64') 348 and 'utc' not in _dtypes_col_dtype.lower() 349 ) 350 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 351 352 old_uuid_cols = get_uuid_cols(old_df) 353 new_uuid_cols = get_uuid_cols(new_df) 354 uuid_cols = set(new_uuid_cols + old_uuid_cols) 355 356 old_bytes_cols = get_bytes_cols(old_df) 357 new_bytes_cols = get_bytes_cols(new_df) 358 bytes_cols = set(new_bytes_cols + old_bytes_cols) 359 360 old_geometry_cols = get_geometry_cols(old_df) 361 new_geometry_cols = get_geometry_cols(new_df) 362 geometry_cols = set(new_geometry_cols + old_geometry_cols) 363 364 joined_df = merge( 365 new_df.infer_objects(copy=False).fillna(NA), 366 old_df.infer_objects(copy=False).fillna(NA), 367 how='left', 368 on=None, 369 indicator=True, 370 ) 371 changed_rows_mask = (joined_df['_merge'] == 'left_only') 372 new_cols = list(new_df_dtypes) 373 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 374 375 for json_col in json_cols: 376 if json_col not in delta_df.columns: 377 continue 378 try: 379 delta_df[json_col] = delta_df[json_col].apply( 380 lambda x: (json.loads(x) if isinstance(x, str) else x) 381 ) 382 except Exception: 383 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 384 385 for numeric_col in numeric_cols: 386 if numeric_col not in delta_df.columns: 387 continue 388 try: 389 delta_df[numeric_col] = delta_df[numeric_col].apply( 390 functools.partial( 391 attempt_cast_to_numeric, 392 quantize=True, 393 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 394 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 395 ) 396 ) 397 except Exception: 398 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 399 400 for uuid_col in uuid_cols: 401 if uuid_col not in delta_df.columns: 402 continue 403 try: 404 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 405 except Exception: 406 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 407 408 for bytes_col in bytes_cols: 409 if bytes_col not in delta_df.columns: 410 continue 411 try: 412 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 413 except Exception: 414 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 415 416 for geometry_col in geometry_cols: 417 if geometry_col not in delta_df.columns: 418 continue 419 try: 420 delta_df[geometry_col] = delta_df[geometry_col].apply(attempt_cast_to_geometry) 421 except Exception: 422 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 423 424 return delta_df
Left join two DataFrames to find the newest unseen data.
Parameters
- old_df ('pd.DataFrame'):
The original (target) dataframe. Acts as a filter on the
new_df
. - new_df ('pd.DataFrame'):
The fetched (source) dataframe. Rows that are contained in
old_df
are removed. - safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. - dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
- include_unchanged_columns (bool, default False):
If
True
, include columns which haven't changed on rows which have changed. - coerce_mixed_numerics (bool, default True):
If
True
, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal
). - debug (bool, default False): Verbosity toggle.
Returns
- A pandas dataframe of the new, unseen rows in
new_df
.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
a
0 3
427def parse_df_datetimes( 428 df: 'pd.DataFrame', 429 ignore_cols: Optional[Iterable[str]] = None, 430 strip_timezone: bool = False, 431 chunksize: Optional[int] = None, 432 dtype_backend: str = 'numpy_nullable', 433 ignore_all: bool = False, 434 precision_unit: Optional[str] = None, 435 coerce_utc: bool = True, 436 debug: bool = False, 437) -> 'pd.DataFrame': 438 """ 439 Parse a pandas DataFrame for datetime columns and cast as datetimes. 440 441 Parameters 442 ---------- 443 df: pd.DataFrame 444 The pandas DataFrame to parse. 445 446 ignore_cols: Optional[Iterable[str]], default None 447 If provided, do not attempt to coerce these columns as datetimes. 448 449 strip_timezone: bool, default False 450 If `True`, remove the UTC `tzinfo` property. 451 452 chunksize: Optional[int], default None 453 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 454 455 dtype_backend: str, default 'numpy_nullable' 456 If `df` is not a DataFrame and new one needs to be constructed, 457 use this as the datatypes backend. 458 Accepted values are 'numpy_nullable' and 'pyarrow'. 459 460 ignore_all: bool, default False 461 If `True`, do not attempt to cast any columns to datetimes. 462 463 precision_unit: Optional[str], default None 464 If provided, enforce the given precision on the coerced datetime columns. 465 466 coerce_utc: bool, default True 467 Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`). 468 469 debug: bool, default False 470 Verbosity toggle. 471 472 Returns 473 ------- 474 A new pandas DataFrame with the determined datetime columns 475 (usually ISO strings) cast as datetimes. 476 477 Examples 478 -------- 479 ```python 480 >>> import pandas as pd 481 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 482 >>> df.dtypes 483 a object 484 dtype: object 485 >>> df2 = parse_df_datetimes(df) 486 >>> df2.dtypes 487 a datetime64[us, UTC] 488 dtype: object 489 490 ``` 491 492 """ 493 from meerschaum.utils.packages import import_pandas, attempt_import 494 from meerschaum.utils.debug import dprint 495 from meerschaum.utils.warnings import warn 496 from meerschaum.utils.misc import items_str 497 from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES 498 import traceback 499 500 pd = import_pandas() 501 pandas = attempt_import('pandas') 502 pd_name = pd.__name__ 503 using_dask = 'dask' in pd_name 504 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 505 dask_dataframe = None 506 if using_dask or df_is_dask: 507 npartitions = chunksize_to_npartitions(chunksize) 508 dask_dataframe = attempt_import('dask.dataframe') 509 510 ### if df is a dict, build DataFrame 511 if isinstance(df, pandas.DataFrame): 512 pdf = df 513 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 514 pdf = get_first_valid_dask_partition(df) 515 else: 516 if debug: 517 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 518 519 if using_dask: 520 if isinstance(df, list): 521 keys = set() 522 for doc in df: 523 for key in doc: 524 keys.add(key) 525 df = pd.DataFrame.from_dict( 526 { 527 k: [ 528 doc.get(k, None) 529 for doc in df 530 ] for k in keys 531 }, 532 npartitions=npartitions, 533 ) 534 elif isinstance(df, dict): 535 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 536 elif 'pandas.core.frame.DataFrame' in str(type(df)): 537 df = pd.from_pandas(df, npartitions=npartitions) 538 else: 539 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 540 pandas = attempt_import('pandas') 541 pdf = get_first_valid_dask_partition(df) 542 543 else: 544 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 545 pdf = df 546 547 ### skip parsing if DataFrame is empty 548 if len(pdf) == 0: 549 if debug: 550 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 551 return df 552 553 ignore_cols = set( 554 (ignore_cols or []) + [ 555 col 556 for col, dtype in pdf.dtypes.items() 557 if 'datetime' in str(dtype) 558 ] 559 ) 560 cols_to_inspect = [ 561 col 562 for col in pdf.columns 563 if col not in ignore_cols 564 ] if not ignore_all else [] 565 566 if len(cols_to_inspect) == 0: 567 if debug: 568 dprint("All columns are ignored, skipping datetime detection...") 569 return df.infer_objects(copy=False).fillna(pandas.NA) 570 571 ### apply regex to columns to determine which are ISO datetimes 572 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 573 dt_mask = pdf[cols_to_inspect].astype(str).apply( 574 lambda s: s.str.match(iso_dt_regex).all() 575 ) 576 577 ### list of datetime column names 578 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 579 if not datetime_cols: 580 if debug: 581 dprint("No columns detected as datetimes, returning...") 582 return df.infer_objects(copy=False).fillna(pandas.NA) 583 584 if debug: 585 dprint("Converting columns to datetimes: " + str(datetime_cols)) 586 587 def _parse_to_datetime(x): 588 return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc) 589 590 try: 591 if not using_dask: 592 df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime) 593 else: 594 df[datetime_cols] = df[datetime_cols].apply( 595 _parse_to_datetime, 596 utc=True, 597 axis=1, 598 meta={ 599 col: MRSM_PD_DTYPES['datetime'] 600 for col in datetime_cols 601 } 602 ) 603 except Exception: 604 warn( 605 f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n" 606 + f"{traceback.format_exc()}" 607 ) 608 609 if strip_timezone: 610 for dt in datetime_cols: 611 try: 612 df[dt] = df[dt].dt.tz_localize(None) 613 except Exception: 614 warn( 615 f"Unable to convert column '{dt}' to naive datetime:\n" 616 + f"{traceback.format_exc()}" 617 ) 618 619 return df.fillna(pandas.NA)
Parse a pandas DataFrame for datetime columns and cast as datetimes.
Parameters
- df (pd.DataFrame): The pandas DataFrame to parse.
- ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
- strip_timezone (bool, default False):
If
True
, remove the UTCtzinfo
property. - chunksize (Optional[int], default None):
If the pandas implementation is
'dask'
, use this chunksize for the distributed dataframe. - dtype_backend (str, default 'numpy_nullable'):
If
df
is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'. - ignore_all (bool, default False):
If
True
, do not attempt to cast any columns to datetimes. - precision_unit (Optional[str], default None): If provided, enforce the given precision on the coerced datetime columns.
- coerce_utc (bool, default True):
Coerce the datetime columns to UTC (see
meerschaum.utils.dtypes.to_datetime()
). - debug (bool, default False): Verbosity toggle.
Returns
- A new pandas DataFrame with the determined datetime columns
- (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']})
>>> df.dtypes
a object
dtype: object
>>> df2 = parse_df_datetimes(df)
>>> df2.dtypes
a datetime64[us, UTC]
dtype: object
622def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 623 """ 624 Get the columns which contain unhashable objects from a Pandas DataFrame. 625 626 Parameters 627 ---------- 628 df: pd.DataFrame 629 The DataFrame which may contain unhashable objects. 630 631 Returns 632 ------- 633 A list of columns. 634 """ 635 if df is None: 636 return [] 637 if len(df) == 0: 638 return [] 639 640 is_dask = 'dask' in df.__module__ 641 if is_dask: 642 from meerschaum.utils.packages import attempt_import 643 pandas = attempt_import('pandas') 644 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 645 return [ 646 col for col, val in df.iloc[0].items() 647 if not isinstance(val, Hashable) 648 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns.
651def get_json_cols(df: 'pd.DataFrame') -> List[str]: 652 """ 653 Get the columns which contain unhashable objects from a Pandas DataFrame. 654 655 Parameters 656 ---------- 657 df: pd.DataFrame 658 The DataFrame which may contain unhashable objects. 659 660 Returns 661 ------- 662 A list of columns to be encoded as JSON. 663 """ 664 if df is None: 665 return [] 666 667 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 668 if is_dask: 669 df = get_first_valid_dask_partition(df) 670 671 if len(df) == 0: 672 return [] 673 674 cols_indices = { 675 col: df[col].first_valid_index() 676 for col in df.columns 677 } 678 return [ 679 col 680 for col, ix in cols_indices.items() 681 if ( 682 ix is not None 683 and isinstance(df.loc[ix][col], (dict, list)) 684 ) 685 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns to be encoded as JSON.
688def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 689 """ 690 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 691 692 Parameters 693 ---------- 694 df: pd.DataFrame 695 The DataFrame which may contain decimal objects. 696 697 Returns 698 ------- 699 A list of columns to treat as numerics. 700 """ 701 if df is None: 702 return [] 703 from decimal import Decimal 704 is_dask = 'dask' in df.__module__ 705 if is_dask: 706 df = get_first_valid_dask_partition(df) 707 708 if len(df) == 0: 709 return [] 710 711 cols_indices = { 712 col: df[col].first_valid_index() 713 for col in df.columns 714 } 715 return [ 716 col 717 for col, ix in cols_indices.items() 718 if ( 719 ix is not None 720 and 721 isinstance(df.loc[ix][col], Decimal) 722 ) 723 ]
Get the columns which contain decimal.Decimal
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
- A list of columns to treat as numerics.
726def get_bool_cols(df: 'pd.DataFrame') -> List[str]: 727 """ 728 Get the columns which contain `bool` objects from a Pandas DataFrame. 729 730 Parameters 731 ---------- 732 df: pd.DataFrame 733 The DataFrame which may contain bools. 734 735 Returns 736 ------- 737 A list of columns to treat as bools. 738 """ 739 if df is None: 740 return [] 741 742 is_dask = 'dask' in df.__module__ 743 if is_dask: 744 df = get_first_valid_dask_partition(df) 745 746 if len(df) == 0: 747 return [] 748 749 from meerschaum.utils.dtypes import are_dtypes_equal 750 751 return [ 752 col 753 for col, typ in df.dtypes.items() 754 if are_dtypes_equal(str(typ), 'bool') 755 ]
Get the columns which contain bool
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bools.
Returns
- A list of columns to treat as bools.
758def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 759 """ 760 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 761 762 Parameters 763 ---------- 764 df: pd.DataFrame 765 The DataFrame which may contain UUID objects. 766 767 Returns 768 ------- 769 A list of columns to treat as UUIDs. 770 """ 771 if df is None: 772 return [] 773 from uuid import UUID 774 is_dask = 'dask' in df.__module__ 775 if is_dask: 776 df = get_first_valid_dask_partition(df) 777 778 if len(df) == 0: 779 return [] 780 781 cols_indices = { 782 col: df[col].first_valid_index() 783 for col in df.columns 784 } 785 return [ 786 col 787 for col, ix in cols_indices.items() 788 if ( 789 ix is not None 790 and 791 isinstance(df.loc[ix][col], UUID) 792 ) 793 ]
Get the columns which contain uuid.UUID
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
- A list of columns to treat as UUIDs.
796def get_datetime_cols( 797 df: 'pd.DataFrame', 798 timezone_aware: bool = True, 799 timezone_naive: bool = True, 800 with_tz_precision: bool = False, 801) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]: 802 """ 803 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 804 805 Parameters 806 ---------- 807 df: pd.DataFrame 808 The DataFrame which may contain `datetime` or `Timestamp` objects. 809 810 timezone_aware: bool, default True 811 If `True`, include timezone-aware datetime columns. 812 813 timezone_naive: bool, default True 814 If `True`, include timezone-naive datetime columns. 815 816 with_tz_precision: bool, default False 817 If `True`, return a dictionary mapping column names to tuples in the form 818 `(timezone, precision)`. 819 820 Returns 821 ------- 822 A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples 823 (if `with_tz_precision` is `True`). 824 """ 825 if not timezone_aware and not timezone_naive: 826 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 827 828 if df is None: 829 return [] if not with_tz_precision else {} 830 831 from datetime import datetime 832 from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES 833 is_dask = 'dask' in df.__module__ 834 if is_dask: 835 df = get_first_valid_dask_partition(df) 836 837 def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]: 838 """ 839 Extract the tz + precision tuple from a dtype string. 840 """ 841 meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '') 842 tz = ( 843 None 844 if ',' not in meta_str 845 else meta_str.split(',', maxsplit=1)[-1] 846 ) 847 precision_abbreviation = ( 848 meta_str 849 if ',' not in meta_str 850 else meta_str.split(',')[0] 851 ) 852 precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation] 853 return tz, precision 854 855 def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]: 856 """ 857 Return the tz + precision tuple from a Python datetime object. 858 """ 859 return dt.tzname(), 'microsecond' 860 861 known_dt_cols_types = { 862 col: str(typ) 863 for col, typ in df.dtypes.items() 864 if are_dtypes_equal('datetime', str(typ)) 865 } 866 867 known_dt_cols_tuples = { 868 col: get_tz_precision_from_dtype(typ) 869 for col, typ in known_dt_cols_types.items() 870 } 871 872 if len(df) == 0: 873 return ( 874 list(known_dt_cols_types) 875 if not with_tz_precision 876 else known_dt_cols_tuples 877 ) 878 879 cols_indices = { 880 col: df[col].first_valid_index() 881 for col in df.columns 882 if col not in known_dt_cols_types 883 } 884 pydt_cols_tuples = { 885 col: get_tz_precision_from_datetime(sample_val) 886 for col, ix in cols_indices.items() 887 if ( 888 ix is not None 889 and 890 isinstance((sample_val := df.loc[ix][col]), datetime) 891 ) 892 } 893 894 dt_cols_tuples = { 895 **known_dt_cols_tuples, 896 **pydt_cols_tuples 897 } 898 899 all_dt_cols_tuples = { 900 col: dt_cols_tuples[col] 901 for col in df.columns 902 if col in dt_cols_tuples 903 } 904 if timezone_aware and timezone_naive: 905 return ( 906 list(all_dt_cols_tuples) 907 if not with_tz_precision 908 else all_dt_cols_tuples 909 ) 910 911 known_timezone_aware_dt_cols = [ 912 col 913 for col in known_dt_cols_types 914 if getattr(df[col], 'tz', None) is not None 915 ] 916 timezone_aware_pydt_cols_tuples = { 917 col: (tz, precision) 918 for col, (tz, precision) in pydt_cols_tuples.items() 919 if df.loc[cols_indices[col]][col].tzinfo is not None 920 } 921 timezone_aware_dt_cols_set = set( 922 known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples) 923 ) 924 timezone_aware_cols_tuples = { 925 col: (tz, precision) 926 for col, (tz, precision) in all_dt_cols_tuples.items() 927 if col in timezone_aware_dt_cols_set 928 } 929 timezone_naive_cols_tuples = { 930 col: (tz, precision) 931 for col, (tz, precision) in all_dt_cols_tuples.items() 932 if col not in timezone_aware_dt_cols_set 933 } 934 935 if timezone_aware: 936 return ( 937 list(timezone_aware_cols_tuples) 938 if not with_tz_precision 939 else timezone_aware_cols_tuples 940 ) 941 942 return ( 943 list(timezone_naive_cols_tuples) 944 if not with_tz_precision 945 else timezone_naive_cols_tuples 946 )
Get the columns which contain datetime
or Timestamp
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame):
The DataFrame which may contain
datetime
orTimestamp
objects. - timezone_aware (bool, default True):
If
True
, include timezone-aware datetime columns. - timezone_naive (bool, default True):
If
True
, include timezone-naive datetime columns. - with_tz_precision (bool, default False):
If
True
, return a dictionary mapping column names to tuples in the form(timezone, precision)
.
Returns
- A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
- (if
with_tz_precision
isTrue
).
949def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 950 """ 951 Return a dictionary mapping datetime columns to specific types strings. 952 953 Parameters 954 ---------- 955 df: pd.DataFrame 956 The DataFrame which may contain datetime columns. 957 958 Returns 959 ------- 960 A dictionary mapping the datetime columns' names to dtype strings 961 (containing timezone and precision metadata). 962 963 Examples 964 -------- 965 >>> from datetime import datetime, timezone 966 >>> import pandas as pd 967 >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]}) 968 >>> get_datetime_cols_types(df) 969 {'dt_tz_aware': 'datetime64[us, UTC]'} 970 >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]}) 971 >>> get_datetime_cols_types(df) 972 {'distant_dt': 'datetime64[us]'} 973 >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)}) 974 >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]') 975 >>> get_datetime_cols_types(df) 976 {'dt_second': 'datetime64[s]'} 977 """ 978 from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS 979 dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True) 980 if not dt_cols_tuples: 981 return {} 982 983 return { 984 col: ( 985 f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]" 986 if tz is None 987 else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]" 988 ) 989 for col, (tz, precision) in dt_cols_tuples.items() 990 }
Return a dictionary mapping datetime columns to specific types strings.
Parameters
- df (pd.DataFrame): The DataFrame which may contain datetime columns.
Returns
- A dictionary mapping the datetime columns' names to dtype strings
- (containing timezone and precision metadata).
Examples
>>> from datetime import datetime, timezone
>>> import pandas as pd
>>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
>>> get_datetime_cols_types(df)
{'dt_tz_aware': 'datetime64[us, UTC]'}
>>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
>>> get_datetime_cols_types(df)
{'distant_dt': 'datetime64[us]'}
>>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
>>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
>>> get_datetime_cols_types(df)
{'dt_second': 'datetime64[s]'}
993def get_date_cols(df: 'pd.DataFrame') -> List[str]: 994 """ 995 Get the `date` columns from a Pandas DataFrame. 996 997 Parameters 998 ---------- 999 df: pd.DataFrame 1000 The DataFrame which may contain dates. 1001 1002 Returns 1003 ------- 1004 A list of columns to treat as dates. 1005 """ 1006 from meerschaum.utils.dtypes import are_dtypes_equal 1007 if df is None: 1008 return [] 1009 1010 is_dask = 'dask' in df.__module__ 1011 if is_dask: 1012 df = get_first_valid_dask_partition(df) 1013 1014 known_date_cols = [ 1015 col 1016 for col, typ in df.dtypes.items() 1017 if are_dtypes_equal(typ, 'date') 1018 ] 1019 1020 if len(df) == 0: 1021 return known_date_cols 1022 1023 cols_indices = { 1024 col: df[col].first_valid_index() 1025 for col in df.columns 1026 if col not in known_date_cols 1027 } 1028 object_date_cols = [ 1029 col 1030 for col, ix in cols_indices.items() 1031 if ( 1032 ix is not None 1033 and isinstance(df.loc[ix][col], date) 1034 ) 1035 ] 1036 1037 all_date_cols = set(known_date_cols + object_date_cols) 1038 1039 return [ 1040 col 1041 for col in df.columns 1042 if col in all_date_cols 1043 ]
Get the date
columns from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain dates.
Returns
- A list of columns to treat as dates.
1046def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 1047 """ 1048 Get the columns which contain bytes strings from a Pandas DataFrame. 1049 1050 Parameters 1051 ---------- 1052 df: pd.DataFrame 1053 The DataFrame which may contain bytes strings. 1054 1055 Returns 1056 ------- 1057 A list of columns to treat as bytes. 1058 """ 1059 if df is None: 1060 return [] 1061 1062 is_dask = 'dask' in df.__module__ 1063 if is_dask: 1064 df = get_first_valid_dask_partition(df) 1065 1066 known_bytes_cols = [ 1067 col 1068 for col, typ in df.dtypes.items() 1069 if str(typ) == 'binary[pyarrow]' 1070 ] 1071 1072 if len(df) == 0: 1073 return known_bytes_cols 1074 1075 cols_indices = { 1076 col: df[col].first_valid_index() 1077 for col in df.columns 1078 if col not in known_bytes_cols 1079 } 1080 object_bytes_cols = [ 1081 col 1082 for col, ix in cols_indices.items() 1083 if ( 1084 ix is not None 1085 and isinstance(df.loc[ix][col], bytes) 1086 ) 1087 ] 1088 1089 all_bytes_cols = set(known_bytes_cols + object_bytes_cols) 1090 1091 return [ 1092 col 1093 for col in df.columns 1094 if col in all_bytes_cols 1095 ]
Get the columns which contain bytes strings from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
- A list of columns to treat as bytes.
1098def get_geometry_cols( 1099 df: 'pd.DataFrame', 1100 with_types_srids: bool = False, 1101) -> Union[List[str], Dict[str, Any]]: 1102 """ 1103 Get the columns which contain shapely objects from a Pandas DataFrame. 1104 1105 Parameters 1106 ---------- 1107 df: pd.DataFrame 1108 The DataFrame which may contain bytes strings. 1109 1110 with_types_srids: bool, default False 1111 If `True`, return a dictionary mapping columns to geometry types and SRIDs. 1112 1113 Returns 1114 ------- 1115 A list of columns to treat as `geometry`. 1116 If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID). 1117 """ 1118 if df is None: 1119 return [] if not with_types_srids else {} 1120 1121 is_dask = 'dask' in df.__module__ 1122 if is_dask: 1123 df = get_first_valid_dask_partition(df) 1124 1125 if len(df) == 0: 1126 return [] if not with_types_srids else {} 1127 1128 cols_indices = { 1129 col: df[col].first_valid_index() 1130 for col in df.columns 1131 } 1132 geo_cols = [ 1133 col 1134 for col, ix in cols_indices.items() 1135 if ( 1136 ix is not None 1137 and 1138 'shapely' in str(type(df.loc[ix][col])) 1139 ) 1140 ] 1141 if not with_types_srids: 1142 return geo_cols 1143 1144 gpd = mrsm.attempt_import('geopandas', lazy=False) 1145 geo_cols_types_srids = {} 1146 for col in geo_cols: 1147 try: 1148 sample_geo_series = gpd.GeoSeries(df[col], crs=None) 1149 geometry_types = { 1150 geom.geom_type 1151 for geom in sample_geo_series 1152 if hasattr(geom, 'geom_type') 1153 } 1154 geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series) 1155 srid = ( 1156 ( 1157 sample_geo_series.crs.sub_crs_list[0].to_epsg() 1158 if sample_geo_series.crs.is_compound 1159 else sample_geo_series.crs.to_epsg() 1160 ) 1161 if sample_geo_series.crs 1162 else 0 1163 ) 1164 geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry' 1165 if geometry_type != 'geometry' and geometry_has_z: 1166 geometry_type = geometry_type + 'Z' 1167 except Exception: 1168 srid = 0 1169 geometry_type = 'geometry' 1170 geo_cols_types_srids[col] = (geometry_type, srid) 1171 1172 return geo_cols_types_srids
Get the columns which contain shapely objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bytes strings.
- with_types_srids (bool, default False):
If
True
, return a dictionary mapping columns to geometry types and SRIDs.
Returns
- A list of columns to treat as
geometry
. - If
with_types_srids
, return a dictionary mapping columns to tuples in the form (type, SRID).
1175def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 1176 """ 1177 Return a dtypes dictionary mapping columns to specific geometry types (type, srid). 1178 """ 1179 geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True) 1180 new_cols_types = {} 1181 for col, (geometry_type, srid) in geometry_cols_types_srids.items(): 1182 new_dtype = "geometry" 1183 modifier = "" 1184 if not srid and geometry_type.lower() == 'geometry': 1185 new_cols_types[col] = new_dtype 1186 continue 1187 1188 modifier = "[" 1189 if geometry_type.lower() != 'geometry': 1190 modifier += f"{geometry_type}" 1191 1192 if srid: 1193 if modifier != '[': 1194 modifier += ", " 1195 modifier += f"{srid}" 1196 modifier += "]" 1197 new_cols_types[col] = f"{new_dtype}{modifier}" 1198 return new_cols_types
Return a dtypes dictionary mapping columns to specific geometry types (type, srid).
1201def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]: 1202 """ 1203 Return a dtypes dictionary mapping special columns to their dtypes. 1204 """ 1205 return { 1206 **{col: 'json' for col in get_json_cols(df)}, 1207 **{col: 'uuid' for col in get_uuid_cols(df)}, 1208 **{col: 'bytes' for col in get_bytes_cols(df)}, 1209 **{col: 'bool' for col in get_bool_cols(df)}, 1210 **{col: 'numeric' for col in get_numeric_cols(df)}, 1211 **{col: 'date' for col in get_date_cols(df)}, 1212 **get_datetime_cols_types(df), 1213 **get_geometry_cols_types(df), 1214 }
Return a dtypes dictionary mapping special columns to their dtypes.
1217def enforce_dtypes( 1218 df: 'pd.DataFrame', 1219 dtypes: Dict[str, str], 1220 explicit_dtypes: Optional[Dict[str, str]] = None, 1221 safe_copy: bool = True, 1222 coerce_numeric: bool = False, 1223 coerce_timezone: bool = True, 1224 strip_timezone: bool = False, 1225 debug: bool = False, 1226) -> 'pd.DataFrame': 1227 """ 1228 Enforce the `dtypes` dictionary on a DataFrame. 1229 1230 Parameters 1231 ---------- 1232 df: pd.DataFrame 1233 The DataFrame on which to enforce dtypes. 1234 1235 dtypes: Dict[str, str] 1236 The data types to attempt to enforce on the DataFrame. 1237 1238 explicit_dtypes: Optional[Dict[str, str]], default None 1239 If provided, automatic dtype coersion will respect explicitly configured 1240 dtypes (`int`, `float`, `numeric`). 1241 1242 safe_copy: bool, default True 1243 If `True`, create a copy before comparing and modifying the dataframes. 1244 Setting to `False` may mutate the DataFrames. 1245 See `meerschaum.utils.dataframe.filter_unseen_df`. 1246 1247 coerce_numeric: bool, default False 1248 If `True`, convert float and int collisions to numeric. 1249 1250 coerce_timezone: bool, default True 1251 If `True`, convert datetimes to UTC. 1252 1253 strip_timezone: bool, default False 1254 If `coerce_timezone` and `strip_timezone` are `True`, 1255 remove timezone information from datetimes. 1256 1257 debug: bool, default False 1258 Verbosity toggle. 1259 1260 Returns 1261 ------- 1262 The Pandas DataFrame with the types enforced. 1263 """ 1264 import json 1265 import functools 1266 from meerschaum.utils.debug import dprint 1267 from meerschaum.utils.formatting import pprint 1268 from meerschaum.utils.dtypes import ( 1269 are_dtypes_equal, 1270 to_pandas_dtype, 1271 is_dtype_numeric, 1272 attempt_cast_to_numeric, 1273 attempt_cast_to_uuid, 1274 attempt_cast_to_bytes, 1275 attempt_cast_to_geometry, 1276 coerce_timezone as _coerce_timezone, 1277 get_geometry_type_srid, 1278 ) 1279 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 1280 pandas = mrsm.attempt_import('pandas') 1281 is_dask = 'dask' in df.__module__ 1282 if safe_copy: 1283 df = df.copy() 1284 if len(df.columns) == 0: 1285 if debug: 1286 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 1287 return df 1288 1289 explicit_dtypes = explicit_dtypes or {} 1290 pipe_pandas_dtypes = { 1291 col: to_pandas_dtype(typ) 1292 for col, typ in dtypes.items() 1293 } 1294 json_cols = [ 1295 col 1296 for col, typ in dtypes.items() 1297 if typ == 'json' 1298 ] 1299 numeric_cols = [ 1300 col 1301 for col, typ in dtypes.items() 1302 if typ.startswith('numeric') 1303 ] 1304 geometry_cols_types_srids = { 1305 col: get_geometry_type_srid(typ, default_srid=0) 1306 for col, typ in dtypes.items() 1307 if typ.startswith('geometry') or typ.startswith('geography') 1308 } 1309 uuid_cols = [ 1310 col 1311 for col, typ in dtypes.items() 1312 if typ == 'uuid' 1313 ] 1314 bytes_cols = [ 1315 col 1316 for col, typ in dtypes.items() 1317 if typ == 'bytes' 1318 ] 1319 datetime_cols = [ 1320 col 1321 for col, typ in dtypes.items() 1322 if are_dtypes_equal(typ, 'datetime') 1323 ] 1324 df_numeric_cols = get_numeric_cols(df) 1325 if debug: 1326 dprint("Desired data types:") 1327 pprint(dtypes) 1328 dprint("Data types for incoming DataFrame:") 1329 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 1330 1331 if json_cols and len(df) > 0: 1332 if debug: 1333 dprint(f"Checking columns for JSON encoding: {json_cols}") 1334 for col in json_cols: 1335 if col in df.columns: 1336 try: 1337 df[col] = df[col].apply( 1338 ( 1339 lambda x: ( 1340 json.loads(x) 1341 if isinstance(x, str) 1342 else x 1343 ) 1344 ) 1345 ) 1346 except Exception as e: 1347 if debug: 1348 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 1349 1350 if numeric_cols: 1351 if debug: 1352 dprint(f"Checking for numerics: {numeric_cols}") 1353 for col in numeric_cols: 1354 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 1355 if col in df.columns: 1356 try: 1357 df[col] = df[col].apply( 1358 functools.partial( 1359 attempt_cast_to_numeric, 1360 quantize=True, 1361 precision=precision, 1362 scale=scale, 1363 ) 1364 ) 1365 except Exception as e: 1366 if debug: 1367 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 1368 1369 if uuid_cols: 1370 if debug: 1371 dprint(f"Checking for UUIDs: {uuid_cols}") 1372 for col in uuid_cols: 1373 if col in df.columns: 1374 try: 1375 df[col] = df[col].apply(attempt_cast_to_uuid) 1376 except Exception as e: 1377 if debug: 1378 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1379 1380 if bytes_cols: 1381 if debug: 1382 dprint(f"Checking for bytes: {bytes_cols}") 1383 for col in bytes_cols: 1384 if col in df.columns: 1385 try: 1386 df[col] = df[col].apply(attempt_cast_to_bytes) 1387 except Exception as e: 1388 if debug: 1389 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1390 1391 if datetime_cols and coerce_timezone: 1392 if debug: 1393 dprint(f"Checking for datetime conversion: {datetime_cols}") 1394 for col in datetime_cols: 1395 if col in df.columns: 1396 if not strip_timezone and 'utc' in str(df.dtypes[col]).lower(): 1397 if debug: 1398 dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).") 1399 continue 1400 if strip_timezone and ',' not in str(df.dtypes[col]): 1401 if debug: 1402 dprint( 1403 f"Skip UTC coersion (stripped) for column '{col}' " 1404 f"({str(df[col].dtype)})." 1405 ) 1406 continue 1407 1408 if debug: 1409 dprint( 1410 f"Data type for column '{col}' before timezone coersion: " 1411 f"{str(df[col].dtype)}" 1412 ) 1413 1414 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1415 if debug: 1416 dprint( 1417 f"Data type for column '{col}' after timezone coersion: " 1418 f"{str(df[col].dtype)}" 1419 ) 1420 1421 if geometry_cols_types_srids: 1422 geopandas = mrsm.attempt_import('geopandas') 1423 if debug: 1424 dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}") 1425 parsed_geom_cols = [] 1426 for col in geometry_cols_types_srids: 1427 try: 1428 df[col] = df[col].apply(attempt_cast_to_geometry) 1429 parsed_geom_cols.append(col) 1430 except Exception as e: 1431 if debug: 1432 dprint(f"Unable to parse column '{col}' as geometry:\n{e}") 1433 1434 if parsed_geom_cols: 1435 if debug: 1436 dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...") 1437 try: 1438 _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]] 1439 df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid) 1440 for col, (_, srid) in geometry_cols_types_srids.items(): 1441 if srid: 1442 if debug: 1443 dprint(f"Setting '{col}' to SRID '{srid}'...") 1444 _ = df[col].set_crs(srid) 1445 if parsed_geom_cols[0] not in df.columns: 1446 df.rename_geometry(parsed_geom_cols[0], inplace=True) 1447 except (ValueError, TypeError): 1448 if debug: 1449 import traceback 1450 dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}") 1451 1452 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1453 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1454 if debug: 1455 dprint("Data types match. Exiting enforcement...") 1456 return df 1457 1458 common_dtypes = {} 1459 common_diff_dtypes = {} 1460 for col, typ in pipe_pandas_dtypes.items(): 1461 if col in df_dtypes: 1462 common_dtypes[col] = typ 1463 if not are_dtypes_equal(typ, df_dtypes[col]): 1464 common_diff_dtypes[col] = df_dtypes[col] 1465 1466 if debug: 1467 dprint("Common columns with different dtypes:") 1468 pprint(common_diff_dtypes) 1469 1470 detected_dt_cols = {} 1471 for col, typ in common_diff_dtypes.items(): 1472 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1473 df_dtypes[col] = typ 1474 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1475 for col in detected_dt_cols: 1476 del common_diff_dtypes[col] 1477 1478 if debug: 1479 dprint("Common columns with different dtypes (after dates):") 1480 pprint(common_diff_dtypes) 1481 1482 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1483 if debug: 1484 dprint( 1485 "The incoming DataFrame has mostly the same types, skipping enforcement." 1486 + "The only detected difference was in the following datetime columns." 1487 ) 1488 pprint(detected_dt_cols) 1489 return df 1490 1491 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1492 previous_typ = common_dtypes[col] 1493 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1494 explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float') 1495 explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int') 1496 explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric') 1497 all_nan = ( 1498 df[col].isnull().all() 1499 if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int) 1500 else None 1501 ) 1502 cast_to_numeric = explicitly_numeric or ( 1503 ( 1504 col in df_numeric_cols 1505 or ( 1506 mixed_numeric_types 1507 and not (explicitly_float or explicitly_int) 1508 and not all_nan 1509 and coerce_numeric 1510 ) 1511 ) 1512 ) 1513 1514 if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types): 1515 from meerschaum.utils.formatting import make_header 1516 msg = ( 1517 make_header(f"Coercing column '{col}' to numeric:", left_pad=0) 1518 + "\n" 1519 + f" Previous type: {previous_typ}\n" 1520 + f" Current type: {typ if col not in df_numeric_cols else 'Decimal'}" 1521 + ("\n Column is explicitly numeric." if explicitly_numeric else "") 1522 ) if cast_to_numeric else ( 1523 f"Will not coerce column '{col}' to numeric.\n" 1524 f" Numeric columns in dataframe: {df_numeric_cols}\n" 1525 f" Mixed numeric types: {mixed_numeric_types}\n" 1526 f" Explicitly float: {explicitly_float}\n" 1527 f" Explicitly int: {explicitly_int}\n" 1528 f" All NaN: {all_nan}\n" 1529 f" Coerce numeric: {coerce_numeric}" 1530 ) 1531 dprint(msg) 1532 1533 if cast_to_numeric: 1534 common_dtypes[col] = attempt_cast_to_numeric 1535 common_diff_dtypes[col] = attempt_cast_to_numeric 1536 1537 for d in common_diff_dtypes: 1538 t = common_dtypes[d] 1539 if debug: 1540 dprint(f"Casting column {d} to dtype {t}.") 1541 try: 1542 df[d] = ( 1543 df[d].apply(t) 1544 if callable(t) 1545 else df[d].astype(t) 1546 ) 1547 except Exception as e: 1548 if debug: 1549 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}") 1550 if 'int' in str(t).lower(): 1551 try: 1552 df[d] = df[d].astype('float64').astype(t) 1553 except Exception: 1554 if debug: 1555 dprint(f"Was unable to convert to float then {t}.") 1556 return df
Enforce the dtypes
dictionary on a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame on which to enforce dtypes.
- dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
- explicit_dtypes (Optional[Dict[str, str]], default None):
If provided, automatic dtype coersion will respect explicitly configured
dtypes (
int
,float
,numeric
). - safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df
. - coerce_numeric (bool, default False):
If
True
, convert float and int collisions to numeric. - coerce_timezone (bool, default True):
If
True
, convert datetimes to UTC. - strip_timezone (bool, default False):
If
coerce_timezone
andstrip_timezone
areTrue
, remove timezone information from datetimes. - debug (bool, default False): Verbosity toggle.
Returns
- The Pandas DataFrame with the types enforced.
1559def get_datetime_bound_from_df( 1560 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1561 datetime_column: str, 1562 minimum: bool = True, 1563) -> Union[int, datetime, None]: 1564 """ 1565 Return the minimum or maximum datetime (or integer) from a DataFrame. 1566 1567 Parameters 1568 ---------- 1569 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1570 The DataFrame, list, or dict which contains the range axis. 1571 1572 datetime_column: str 1573 The name of the datetime (or int) column. 1574 1575 minimum: bool 1576 Whether to return the minimum (default) or maximum value. 1577 1578 Returns 1579 ------- 1580 The minimum or maximum datetime value in the dataframe, or `None`. 1581 """ 1582 from meerschaum.utils.dtypes import to_datetime, value_is_null 1583 1584 if df is None: 1585 return None 1586 if not datetime_column: 1587 return None 1588 1589 def compare(a, b): 1590 if a is None: 1591 return b 1592 if b is None: 1593 return a 1594 if minimum: 1595 return a if a < b else b 1596 return a if a > b else b 1597 1598 if isinstance(df, list): 1599 if len(df) == 0: 1600 return None 1601 best_yet = df[0].get(datetime_column, None) 1602 for doc in df: 1603 val = doc.get(datetime_column, None) 1604 best_yet = compare(best_yet, val) 1605 return best_yet 1606 1607 if isinstance(df, dict): 1608 if datetime_column not in df: 1609 return None 1610 best_yet = df[datetime_column][0] 1611 for val in df[datetime_column]: 1612 best_yet = compare(best_yet, val) 1613 return best_yet 1614 1615 if 'DataFrame' in str(type(df)): 1616 from meerschaum.utils.dtypes import are_dtypes_equal 1617 pandas = mrsm.attempt_import('pandas') 1618 is_dask = 'dask' in df.__module__ 1619 1620 if datetime_column not in df.columns: 1621 return None 1622 1623 try: 1624 dt_val = ( 1625 df[datetime_column].min(skipna=True) 1626 if minimum 1627 else df[datetime_column].max(skipna=True) 1628 ) 1629 except Exception: 1630 dt_val = pandas.NA 1631 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1632 dt_val = dt_val.compute() 1633 1634 return ( 1635 to_datetime(dt_val, as_pydatetime=True) 1636 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1637 else (dt_val if not value_is_null(dt_val) else None) 1638 ) 1639 1640 return None
Return the minimum or maximum datetime (or integer) from a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
- datetime_column (str): The name of the datetime (or int) column.
- minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
- The minimum or maximum datetime value in the dataframe, or
None
.
1643def get_unique_index_values( 1644 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1645 indices: List[str], 1646) -> Dict[str, List[Any]]: 1647 """ 1648 Return a dictionary of the unique index values in a DataFrame. 1649 1650 Parameters 1651 ---------- 1652 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1653 The dataframe (or list or dict) which contains index values. 1654 1655 indices: List[str] 1656 The list of index columns. 1657 1658 Returns 1659 ------- 1660 A dictionary mapping indices to unique values. 1661 """ 1662 if df is None: 1663 return {} 1664 if 'dataframe' in str(type(df)).lower(): 1665 pandas = mrsm.attempt_import('pandas') 1666 return { 1667 col: list({ 1668 (val if val is not pandas.NA else None) 1669 for val in df[col].unique() 1670 }) 1671 for col in indices 1672 if col in df.columns 1673 } 1674 1675 unique_indices = defaultdict(lambda: set()) 1676 if isinstance(df, list): 1677 for doc in df: 1678 for index in indices: 1679 if index in doc: 1680 unique_indices[index].add(doc[index]) 1681 1682 elif isinstance(df, dict): 1683 for index in indices: 1684 if index in df: 1685 unique_indices[index] = unique_indices[index].union(set(df[index])) 1686 1687 return {key: list(val) for key, val in unique_indices.items()}
Return a dictionary of the unique index values in a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
- indices (List[str]): The list of index columns.
Returns
- A dictionary mapping indices to unique values.
1690def df_is_chunk_generator(df: Any) -> bool: 1691 """ 1692 Determine whether to treat `df` as a chunk generator. 1693 1694 Note this should only be used in a context where generators are expected, 1695 as it will return `True` for any iterable. 1696 1697 Parameters 1698 ---------- 1699 The DataFrame or chunk generator to evaluate. 1700 1701 Returns 1702 ------- 1703 A `bool` indicating whether to treat `df` as a generator. 1704 """ 1705 return ( 1706 not isinstance(df, (dict, list, str)) 1707 and 'DataFrame' not in str(type(df)) 1708 and isinstance(df, (Generator, Iterable, Iterator)) 1709 )
Determine whether to treat df
as a chunk generator.
Note this should only be used in a context where generators are expected,
as it will return True
for any iterable.
Parameters
- The DataFrame or chunk generator to evaluate.
Returns
- A
bool
indicating whether to treatdf
as a generator.
1712def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1713 """ 1714 Return the Dask `npartitions` value for a given `chunksize`. 1715 """ 1716 if chunksize == -1: 1717 from meerschaum.config import get_config 1718 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1719 if chunksize is None: 1720 return 1 1721 return -1 * chunksize
Return the Dask npartitions
value for a given chunksize
.
1724def df_from_literal( 1725 pipe: Optional[mrsm.Pipe] = None, 1726 literal: Optional[str] = None, 1727 debug: bool = False 1728) -> 'pd.DataFrame': 1729 """ 1730 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1731 1732 Parameters 1733 ---------- 1734 pipe: Optional['meerschaum.Pipe'], default None 1735 The pipe which will consume the literal value. 1736 1737 Returns 1738 ------- 1739 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1740 and the literal as the value. 1741 """ 1742 from meerschaum.utils.packages import import_pandas 1743 from meerschaum.utils.warnings import error, warn 1744 from meerschaum.utils.debug import dprint 1745 from meerschaum.utils.dtypes import get_current_timestamp 1746 1747 if pipe is None or literal is None: 1748 error("Please provide a Pipe and a literal value") 1749 1750 dt_col = pipe.columns.get( 1751 'datetime', 1752 mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing') 1753 ) 1754 val_col = pipe.get_val_column(debug=debug) 1755 1756 val = literal 1757 if isinstance(literal, str): 1758 if debug: 1759 dprint(f"Received literal string: '{literal}'") 1760 import ast 1761 try: 1762 val = ast.literal_eval(literal) 1763 except Exception: 1764 warn( 1765 "Failed to parse value from string:\n" + f"{literal}" + 1766 "\n\nWill cast as a string instead."\ 1767 ) 1768 val = literal 1769 1770 now = get_current_timestamp(pipe.precision) 1771 pd = import_pandas() 1772 return pd.DataFrame({dt_col: [now], val_col: [val]})
Construct a dataframe from a literal value, using the pipe's datetime and value column names.
Parameters
- pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
- A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
- and the literal as the value.
1775def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1776 """ 1777 Return the first valid Dask DataFrame partition (if possible). 1778 """ 1779 pdf = None 1780 for partition in ddf.partitions: 1781 try: 1782 pdf = partition.compute() 1783 except Exception: 1784 continue 1785 if len(pdf) > 0: 1786 return pdf 1787 _ = mrsm.attempt_import('partd', lazy=False) 1788 return ddf.compute()
Return the first valid Dask DataFrame partition (if possible).
1791def query_df( 1792 df: 'pd.DataFrame', 1793 params: Optional[Dict[str, Any]] = None, 1794 begin: Union[datetime, int, None] = None, 1795 end: Union[datetime, int, None] = None, 1796 datetime_column: Optional[str] = None, 1797 select_columns: Optional[List[str]] = None, 1798 omit_columns: Optional[List[str]] = None, 1799 inplace: bool = False, 1800 reset_index: bool = False, 1801 coerce_types: bool = False, 1802 debug: bool = False, 1803) -> 'pd.DataFrame': 1804 """ 1805 Query the dataframe with the params dictionary. 1806 1807 Parameters 1808 ---------- 1809 df: pd.DataFrame 1810 The DataFrame to query against. 1811 1812 params: Optional[Dict[str, Any]], default None 1813 The parameters dictionary to use for the query. 1814 1815 begin: Union[datetime, int, None], default None 1816 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1817 greater than or equal to this value. 1818 1819 end: Union[datetime, int, None], default None 1820 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1821 less than this value. 1822 1823 datetime_column: Optional[str], default None 1824 A `datetime_column` must be provided to use `begin` and `end`. 1825 1826 select_columns: Optional[List[str]], default None 1827 If provided, only return these columns. 1828 1829 omit_columns: Optional[List[str]], default None 1830 If provided, do not include these columns in the result. 1831 1832 inplace: bool, default False 1833 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1834 1835 reset_index: bool, default False 1836 If `True`, reset the index in the resulting DataFrame. 1837 1838 coerce_types: bool, default False 1839 If `True`, cast the dataframe and parameters as strings before querying. 1840 1841 Returns 1842 ------- 1843 A Pandas DataFrame query result. 1844 """ 1845 1846 def _process_select_columns(_df): 1847 if not select_columns: 1848 return 1849 for col in list(_df.columns): 1850 if col not in select_columns: 1851 del _df[col] 1852 1853 def _process_omit_columns(_df): 1854 if not omit_columns: 1855 return 1856 for col in list(_df.columns): 1857 if col in omit_columns: 1858 del _df[col] 1859 1860 if not params and not begin and not end: 1861 if not inplace: 1862 df = df.copy() 1863 _process_select_columns(df) 1864 _process_omit_columns(df) 1865 return df 1866 1867 from meerschaum.utils.debug import dprint 1868 from meerschaum.utils.misc import get_in_ex_params 1869 from meerschaum.utils.warnings import warn 1870 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1871 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1872 pandas = mrsm.attempt_import('pandas') 1873 NA = pandas.NA 1874 1875 if params: 1876 proto_in_ex_params = get_in_ex_params(params) 1877 for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items(): 1878 if proto_ex_vals: 1879 coerce_types = True 1880 break 1881 params = params.copy() 1882 for key, val in {k: v for k, v in params.items()}.items(): 1883 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'): 1884 if None in val: 1885 val = [item for item in val if item is not None] + [NA] 1886 params[key] = val 1887 if coerce_types: 1888 params[key] = [str(x) for x in val] 1889 else: 1890 if value_is_null(val): 1891 val = NA 1892 params[key] = NA 1893 if coerce_types: 1894 params[key] = str(val) 1895 1896 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1897 1898 if inplace: 1899 df.fillna(NA, inplace=True) 1900 else: 1901 df = df.infer_objects().fillna(NA) 1902 1903 if isinstance(begin, str): 1904 begin = dateutil_parser.parse(begin) 1905 if isinstance(end, str): 1906 end = dateutil_parser.parse(end) 1907 1908 if begin is not None or end is not None: 1909 if not datetime_column or datetime_column not in df.columns: 1910 warn( 1911 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1912 + "ignoring begin and end...", 1913 ) 1914 begin, end = None, None 1915 1916 if debug: 1917 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1918 1919 if datetime_column and (begin is not None or end is not None): 1920 if debug: 1921 dprint("Checking for datetime column compatability.") 1922 1923 from meerschaum.utils.dtypes import coerce_timezone 1924 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1925 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1926 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1927 1928 if df_is_dt: 1929 df_tz = ( 1930 getattr(df[datetime_column].dt, 'tz', None) 1931 if hasattr(df[datetime_column], 'dt') 1932 else None 1933 ) 1934 1935 if begin_is_int: 1936 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1937 if debug: 1938 dprint(f"`begin` will be cast to '{begin}'.") 1939 if end_is_int: 1940 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1941 if debug: 1942 dprint(f"`end` will be cast to '{end}'.") 1943 1944 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1945 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1946 1947 in_ex_params = get_in_ex_params(params) 1948 1949 masks = [ 1950 ( 1951 (df[datetime_column] >= begin) 1952 if begin is not None and datetime_column 1953 else True 1954 ) & ( 1955 (df[datetime_column] < end) 1956 if end is not None and datetime_column 1957 else True 1958 ) 1959 ] 1960 1961 masks.extend([ 1962 ( 1963 ( 1964 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1965 if in_vals 1966 else True 1967 ) & ( 1968 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1969 if ex_vals 1970 else True 1971 ) 1972 ) 1973 for col, (in_vals, ex_vals) in in_ex_params.items() 1974 if col in df.columns 1975 ]) 1976 query_mask = masks[0] 1977 for mask in masks[1:]: 1978 query_mask = query_mask & mask 1979 1980 original_cols = df.columns 1981 1982 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1983 ### to allow for `<NA>` values. 1984 bool_cols = [ 1985 col 1986 for col, typ in df.dtypes.items() 1987 if are_dtypes_equal(str(typ), 'bool') 1988 ] 1989 for col in bool_cols: 1990 df[col] = df[col].astype('boolean[pyarrow]') 1991 1992 if not isinstance(query_mask, bool): 1993 df['__mrsm_mask'] = ( 1994 query_mask.astype('boolean[pyarrow]') 1995 if hasattr(query_mask, 'astype') 1996 else query_mask 1997 ) 1998 1999 if inplace: 2000 df.where(query_mask, other=NA, inplace=True) 2001 df.dropna(how='all', inplace=True) 2002 result_df = df 2003 else: 2004 result_df = df.where(query_mask, other=NA) 2005 result_df.dropna(how='all', inplace=True) 2006 2007 else: 2008 result_df = df 2009 2010 if '__mrsm_mask' in df.columns: 2011 del df['__mrsm_mask'] 2012 if '__mrsm_mask' in result_df.columns: 2013 del result_df['__mrsm_mask'] 2014 2015 if reset_index: 2016 result_df.reset_index(drop=True, inplace=True) 2017 2018 result_df = enforce_dtypes( 2019 result_df, 2020 dtypes, 2021 safe_copy=False, 2022 debug=debug, 2023 coerce_numeric=False, 2024 coerce_timezone=False, 2025 ) 2026 2027 if select_columns == ['*']: 2028 select_columns = None 2029 2030 if not select_columns and not omit_columns: 2031 return result_df[original_cols] 2032 2033 _process_select_columns(result_df) 2034 _process_omit_columns(result_df) 2035 2036 return result_df
Query the dataframe with the params dictionary.
Parameters
- df (pd.DataFrame): The DataFrame to query against.
- params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
- begin (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp greater than or equal to this value. - end (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp less than this value. - datetime_column (Optional[str], default None):
A
datetime_column
must be provided to usebegin
andend
. - select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
- inplace (bool, default False):
If
True
, modify the DataFrame inplace rather than creating a new DataFrame. - reset_index (bool, default False):
If
True
, reset the index in the resulting DataFrame. - coerce_types (bool, default False):
If
True
, cast the dataframe and parameters as strings before querying.
Returns
- A Pandas DataFrame query result.
2039def to_json( 2040 df: 'pd.DataFrame', 2041 safe_copy: bool = True, 2042 orient: str = 'records', 2043 date_format: str = 'iso', 2044 date_unit: str = 'us', 2045 double_precision: int = 15, 2046 geometry_format: str = 'geojson', 2047 **kwargs: Any 2048) -> str: 2049 """ 2050 Serialize the given dataframe as a JSON string. 2051 2052 Parameters 2053 ---------- 2054 df: pd.DataFrame 2055 The DataFrame to be serialized. 2056 2057 safe_copy: bool, default True 2058 If `False`, modify the DataFrame inplace. 2059 2060 date_format: str, default 'iso' 2061 The default format for timestamps. 2062 2063 date_unit: str, default 'us' 2064 The precision of the timestamps. 2065 2066 double_precision: int, default 15 2067 The number of decimal places to use when encoding floating point values (maximum 15). 2068 2069 geometry_format: str, default 'geojson' 2070 The serialization format for geometry data. 2071 Accepted values are `geojson`, `wkb_hex`, and `wkt`. 2072 2073 Returns 2074 ------- 2075 A JSON string. 2076 """ 2077 import warnings 2078 import functools 2079 from meerschaum.utils.packages import import_pandas 2080 from meerschaum.utils.dtypes import ( 2081 serialize_bytes, 2082 serialize_decimal, 2083 serialize_geometry, 2084 ) 2085 pd = import_pandas() 2086 uuid_cols = get_uuid_cols(df) 2087 bytes_cols = get_bytes_cols(df) 2088 numeric_cols = get_numeric_cols(df) 2089 geometry_cols = get_geometry_cols(df) 2090 geometry_cols_srids = { 2091 col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0') 2092 for col in geometry_cols 2093 } if 'geodataframe' in str(type(df)).lower() else {} 2094 if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols): 2095 df = df.copy() 2096 for col in uuid_cols: 2097 df[col] = df[col].astype(str) 2098 for col in bytes_cols: 2099 df[col] = df[col].apply(serialize_bytes) 2100 for col in numeric_cols: 2101 df[col] = df[col].apply(serialize_decimal) 2102 with warnings.catch_warnings(): 2103 warnings.simplefilter("ignore") 2104 for col in geometry_cols: 2105 srid = geometry_cols_srids.get(col, None) or None 2106 df[col] = df[col].apply( 2107 functools.partial( 2108 serialize_geometry, 2109 geometry_format=geometry_format, 2110 srid=srid, 2111 ) 2112 ) 2113 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 2114 date_format=date_format, 2115 date_unit=date_unit, 2116 double_precision=double_precision, 2117 orient=orient, 2118 **kwargs 2119 )
Serialize the given dataframe as a JSON string.
Parameters
- df (pd.DataFrame): The DataFrame to be serialized.
- safe_copy (bool, default True):
If
False
, modify the DataFrame inplace. - date_format (str, default 'iso'): The default format for timestamps.
- date_unit (str, default 'us'): The precision of the timestamps.
- double_precision (int, default 15): The number of decimal places to use when encoding floating point values (maximum 15).
- geometry_format (str, default 'geojson'):
The serialization format for geometry data.
Accepted values are
geojson
,wkb_hex
, andwkt
.
Returns
- A JSON string.
2122def to_simple_lines(df: 'pd.DataFrame') -> str: 2123 """ 2124 Serialize a Pandas Dataframe as lines of simple dictionaries. 2125 2126 Parameters 2127 ---------- 2128 df: pd.DataFrame 2129 The dataframe to serialize into simple lines text. 2130 2131 Returns 2132 ------- 2133 A string of simple line dictionaries joined by newlines. 2134 """ 2135 from meerschaum.utils.misc import to_simple_dict 2136 if df is None or len(df) == 0: 2137 return '' 2138 2139 docs = df.to_dict(orient='records') 2140 return '\n'.join(to_simple_dict(doc) for doc in docs)
Serialize a Pandas Dataframe as lines of simple dictionaries.
Parameters
- df (pd.DataFrame): The dataframe to serialize into simple lines text.
Returns
- A string of simple line dictionaries joined by newlines.
2143def parse_simple_lines(data: str) -> 'pd.DataFrame': 2144 """ 2145 Parse simple lines text into a DataFrame. 2146 2147 Parameters 2148 ---------- 2149 data: str 2150 The simple lines text to parse into a DataFrame. 2151 2152 Returns 2153 ------- 2154 A dataframe containing the rows serialized in `data`. 2155 """ 2156 from meerschaum.utils.misc import string_to_dict 2157 from meerschaum.utils.packages import import_pandas 2158 pd = import_pandas() 2159 lines = data.splitlines() 2160 try: 2161 docs = [string_to_dict(line) for line in lines] 2162 df = pd.DataFrame(docs) 2163 except Exception: 2164 df = None 2165 2166 if df is None: 2167 raise ValueError("Cannot parse simple lines into a dataframe.") 2168 2169 return df
Parse simple lines text into a DataFrame.
Parameters
- data (str): The simple lines text to parse into a DataFrame.
Returns
- A dataframe containing the rows serialized in
data
.