meerschaum.utils.dataframe
Utility functions for working with DataFrames.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with DataFrames. 7""" 8 9from __future__ import annotations 10 11from datetime import datetime, timezone, date 12from collections import defaultdict 13 14import meerschaum as mrsm 15from meerschaum.utils.typing import ( 16 Optional, Dict, Any, List, Hashable, Generator, 17 Iterator, Iterable, Union, TYPE_CHECKING, Tuple, 18) 19 20if TYPE_CHECKING: 21 pd, dask = mrsm.attempt_import('pandas', 'dask') 22 23 24def add_missing_cols_to_df( 25 df: 'pd.DataFrame', 26 dtypes: Dict[str, Any], 27) -> 'pd.DataFrame': 28 """ 29 Add columns from the dtypes dictionary as null columns to a new DataFrame. 30 31 Parameters 32 ---------- 33 df: pd.DataFrame 34 The dataframe we should copy and add null columns. 35 36 dtypes: 37 The data types dictionary which may contain keys not present in `df.columns`. 38 39 Returns 40 ------- 41 A new `DataFrame` with the keys from `dtypes` added as null columns. 42 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 43 NOTE: This will not ensure that dtypes are enforced! 44 45 Examples 46 -------- 47 >>> import pandas as pd 48 >>> df = pd.DataFrame([{'a': 1}]) 49 >>> dtypes = {'b': 'Int64'} 50 >>> add_missing_cols_to_df(df, dtypes) 51 a b 52 0 1 <NA> 53 >>> add_missing_cols_to_df(df, dtypes).dtypes 54 a int64 55 b Int64 56 dtype: object 57 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 58 a int64 59 dtype: object 60 >>> 61 """ 62 if set(df.columns) == set(dtypes): 63 return df 64 65 from meerschaum.utils.packages import attempt_import 66 from meerschaum.utils.dtypes import to_pandas_dtype 67 pandas = attempt_import('pandas') 68 69 def build_series(dtype: str): 70 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 71 72 assign_kwargs = { 73 str(col): build_series(str(typ)) 74 for col, typ in dtypes.items() 75 if col not in df.columns 76 } 77 df_with_cols = df.assign(**assign_kwargs) 78 for col in assign_kwargs: 79 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 80 return df_with_cols 81 82 83def filter_unseen_df( 84 old_df: 'pd.DataFrame', 85 new_df: 'pd.DataFrame', 86 safe_copy: bool = True, 87 dtypes: Optional[Dict[str, Any]] = None, 88 include_unchanged_columns: bool = False, 89 coerce_mixed_numerics: bool = True, 90 debug: bool = False, 91) -> 'pd.DataFrame': 92 """ 93 Left join two DataFrames to find the newest unseen data. 94 95 Parameters 96 ---------- 97 old_df: 'pd.DataFrame' 98 The original (target) dataframe. Acts as a filter on the `new_df`. 99 100 new_df: 'pd.DataFrame' 101 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 102 103 safe_copy: bool, default True 104 If `True`, create a copy before comparing and modifying the dataframes. 105 Setting to `False` may mutate the DataFrames. 106 107 dtypes: Optional[Dict[str, Any]], default None 108 Optionally specify the datatypes of the dataframe. 109 110 include_unchanged_columns: bool, default False 111 If `True`, include columns which haven't changed on rows which have changed. 112 113 coerce_mixed_numerics: bool, default True 114 If `True`, cast mixed integer and float columns between the old and new dataframes into 115 numeric values (`decimal.Decimal`). 116 117 debug: bool, default False 118 Verbosity toggle. 119 120 Returns 121 ------- 122 A pandas dataframe of the new, unseen rows in `new_df`. 123 124 Examples 125 -------- 126 ```python 127 >>> import pandas as pd 128 >>> df1 = pd.DataFrame({'a': [1,2]}) 129 >>> df2 = pd.DataFrame({'a': [2,3]}) 130 >>> filter_unseen_df(df1, df2) 131 a 132 0 3 133 134 ``` 135 136 """ 137 if old_df is None: 138 return new_df 139 140 if safe_copy: 141 old_df = old_df.copy() 142 new_df = new_df.copy() 143 144 import json 145 import functools 146 import traceback 147 from meerschaum.utils.warnings import warn 148 from meerschaum.utils.packages import import_pandas, attempt_import 149 from meerschaum.utils.dtypes import ( 150 to_pandas_dtype, 151 are_dtypes_equal, 152 attempt_cast_to_numeric, 153 attempt_cast_to_uuid, 154 attempt_cast_to_bytes, 155 attempt_cast_to_geometry, 156 coerce_timezone, 157 serialize_decimal, 158 ) 159 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 160 pd = import_pandas(debug=debug) 161 is_dask = 'dask' in new_df.__module__ 162 if is_dask: 163 pandas = attempt_import('pandas') 164 _ = attempt_import('partd', lazy=False) 165 dd = attempt_import('dask.dataframe') 166 merge = dd.merge 167 NA = pandas.NA 168 else: 169 merge = pd.merge 170 NA = pd.NA 171 172 new_df_dtypes = dict(new_df.dtypes) 173 old_df_dtypes = dict(old_df.dtypes) 174 175 same_cols = set(new_df.columns) == set(old_df.columns) 176 if not same_cols: 177 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 178 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 179 180 new_types_missing_from_old = { 181 col: typ 182 for col, typ in new_df_dtypes.items() 183 if col not in old_df_dtypes 184 } 185 old_types_missing_from_new = { 186 col: typ 187 for col, typ in new_df_dtypes.items() 188 if col not in old_df_dtypes 189 } 190 old_df_dtypes.update(new_types_missing_from_old) 191 new_df_dtypes.update(old_types_missing_from_new) 192 193 ### Edge case: two empty lists cast to DFs. 194 elif len(new_df.columns) == 0: 195 return new_df 196 197 try: 198 ### Order matters when checking equality. 199 new_df = new_df[old_df.columns] 200 201 except Exception as e: 202 warn( 203 "Was not able to cast old columns onto new DataFrame. " + 204 f"Are both DataFrames the same shape? Error:\n{e}", 205 stacklevel=3, 206 ) 207 return new_df[list(new_df_dtypes.keys())] 208 209 ### assume the old_df knows what it's doing, even if it's technically wrong. 210 if dtypes is None: 211 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 212 213 dtypes = { 214 col: to_pandas_dtype(typ) 215 for col, typ in dtypes.items() 216 if col in new_df_dtypes and col in old_df_dtypes 217 } 218 for col, typ in new_df_dtypes.items(): 219 if col not in dtypes: 220 dtypes[col] = typ 221 222 numeric_cols_precisions_scales = { 223 col: get_numeric_precision_scale(None, typ) 224 for col, typ in dtypes.items() 225 if col and str(typ).lower().startswith('numeric') 226 } 227 228 dt_dtypes = { 229 col: typ 230 for col, typ in dtypes.items() 231 if are_dtypes_equal(typ, 'datetime') 232 } 233 non_dt_dtypes = { 234 col: typ 235 for col, typ in dtypes.items() 236 if col not in dt_dtypes 237 } 238 239 cast_non_dt_cols = True 240 try: 241 new_df = new_df.astype(non_dt_dtypes) 242 cast_non_dt_cols = False 243 except Exception as e: 244 warn( 245 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 246 ) 247 248 cast_dt_cols = True 249 try: 250 for col, typ in dt_dtypes.items(): 251 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 252 strip_utc = ( 253 _dtypes_col_dtype.startswith('datetime64') 254 and 'utc' not in _dtypes_col_dtype.lower() 255 ) 256 if col in old_df.columns: 257 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 258 if col in new_df.columns: 259 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 260 cast_dt_cols = False 261 except Exception as e: 262 warn(f"Could not cast datetime columns:\n{e}") 263 264 cast_cols = cast_dt_cols or cast_non_dt_cols 265 266 new_numeric_cols_existing = get_numeric_cols(new_df) 267 old_numeric_cols = get_numeric_cols(old_df) 268 for col, typ in {k: v for k, v in dtypes.items()}.items(): 269 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 270 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 271 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 272 new_is_numeric = col in new_numeric_cols_existing 273 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 274 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 275 old_is_numeric = col in old_numeric_cols 276 277 if ( 278 coerce_mixed_numerics 279 and 280 (new_is_float or new_is_int or new_is_numeric) 281 and 282 (old_is_float or old_is_int or old_is_numeric) 283 ): 284 dtypes[col] = attempt_cast_to_numeric 285 cast_cols = True 286 continue 287 288 ### Fallback to object if the types don't match. 289 warn( 290 f"Detected different types for '{col}' " 291 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 292 + "falling back to 'object'..." 293 ) 294 dtypes[col] = 'object' 295 cast_cols = True 296 297 if cast_cols: 298 for col, dtype in dtypes.items(): 299 if col in new_df.columns: 300 try: 301 new_df[col] = ( 302 new_df[col].astype(dtype) 303 if not callable(dtype) 304 else new_df[col].apply(dtype) 305 ) 306 except Exception as e: 307 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 308 309 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 310 new_json_cols = get_json_cols(new_df) 311 old_json_cols = get_json_cols(old_df) 312 json_cols = set(new_json_cols + old_json_cols) 313 for json_col in old_json_cols: 314 old_df[json_col] = old_df[json_col].apply(serializer) 315 for json_col in new_json_cols: 316 new_df[json_col] = new_df[json_col].apply(serializer) 317 318 new_numeric_cols = get_numeric_cols(new_df) 319 numeric_cols = set(new_numeric_cols + old_numeric_cols) 320 for numeric_col in old_numeric_cols: 321 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 322 for numeric_col in new_numeric_cols: 323 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 324 325 old_dt_cols = [ 326 col 327 for col, typ in old_df.dtypes.items() 328 if are_dtypes_equal(str(typ), 'datetime') 329 ] 330 for col in old_dt_cols: 331 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 332 strip_utc = ( 333 _dtypes_col_dtype.startswith('datetime64') 334 and 'utc' not in _dtypes_col_dtype.lower() 335 ) 336 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 337 338 new_dt_cols = [ 339 col 340 for col, typ in new_df.dtypes.items() 341 if are_dtypes_equal(str(typ), 'datetime') 342 ] 343 for col in new_dt_cols: 344 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 345 strip_utc = ( 346 _dtypes_col_dtype.startswith('datetime64') 347 and 'utc' not in _dtypes_col_dtype.lower() 348 ) 349 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 350 351 old_uuid_cols = get_uuid_cols(old_df) 352 new_uuid_cols = get_uuid_cols(new_df) 353 uuid_cols = set(new_uuid_cols + old_uuid_cols) 354 355 old_bytes_cols = get_bytes_cols(old_df) 356 new_bytes_cols = get_bytes_cols(new_df) 357 bytes_cols = set(new_bytes_cols + old_bytes_cols) 358 359 old_geometry_cols = get_geometry_cols(old_df) 360 new_geometry_cols = get_geometry_cols(new_df) 361 geometry_cols = set(new_geometry_cols + old_geometry_cols) 362 363 na_pattern = r'(?i)^(none|nan|na|nat|natz|<na>)$' 364 joined_df = merge( 365 new_df.infer_objects().replace(na_pattern, pd.NA, regex=True).fillna(NA), 366 old_df.infer_objects().replace(na_pattern, pd.NA, regex=True).fillna(NA), 367 how='left', 368 on=None, 369 indicator=True, 370 ) 371 changed_rows_mask = (joined_df['_merge'] == 'left_only') 372 new_cols = list(new_df_dtypes) 373 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 374 375 delta_json_cols = get_json_cols(delta_df) 376 for json_col in json_cols: 377 if ( 378 json_col in delta_json_cols 379 or json_col not in delta_df.columns 380 ): 381 continue 382 try: 383 delta_df[json_col] = delta_df[json_col].apply( 384 lambda x: (json.loads(x) if isinstance(x, str) else x) 385 ) 386 except Exception: 387 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 388 389 delta_numeric_cols = get_numeric_cols(delta_df) 390 for numeric_col in numeric_cols: 391 if ( 392 numeric_col in delta_numeric_cols 393 or numeric_col not in delta_df.columns 394 ): 395 continue 396 try: 397 delta_df[numeric_col] = delta_df[numeric_col].apply( 398 functools.partial( 399 attempt_cast_to_numeric, 400 quantize=True, 401 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 402 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 403 ) 404 ) 405 except Exception: 406 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 407 408 delta_uuid_cols = get_uuid_cols(delta_df) 409 for uuid_col in uuid_cols: 410 if ( 411 uuid_col in delta_uuid_cols 412 or uuid_col not in delta_df.columns 413 ): 414 continue 415 try: 416 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 417 except Exception: 418 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 419 420 delta_bytes_cols = get_bytes_cols(delta_df) 421 for bytes_col in bytes_cols: 422 if ( 423 bytes_col in delta_bytes_cols 424 or bytes_col not in delta_df.columns 425 ): 426 continue 427 try: 428 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 429 except Exception: 430 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 431 432 delta_geometry_cols = get_geometry_cols(delta_df) 433 for geometry_col in geometry_cols: 434 if ( 435 geometry_col in delta_geometry_cols 436 or geometry_col not in delta_df.columns 437 ): 438 continue 439 try: 440 delta_df[geometry_col] = attempt_cast_to_geometry(delta_df[geometry_col]) 441 except Exception: 442 warn(f"Unable to parse geometry column '{geometry_col}':\n{traceback.format_exc()}") 443 444 return delta_df 445 446 447def parse_df_datetimes( 448 df: 'pd.DataFrame', 449 ignore_cols: Optional[Iterable[str]] = None, 450 strip_timezone: bool = False, 451 chunksize: Optional[int] = None, 452 dtype_backend: str = 'numpy_nullable', 453 ignore_all: bool = False, 454 precision_unit: Optional[str] = None, 455 coerce_utc: bool = True, 456 debug: bool = False, 457) -> 'pd.DataFrame': 458 """ 459 Parse a pandas DataFrame for datetime columns and cast as datetimes. 460 461 Parameters 462 ---------- 463 df: pd.DataFrame 464 The pandas DataFrame to parse. 465 466 ignore_cols: Optional[Iterable[str]], default None 467 If provided, do not attempt to coerce these columns as datetimes. 468 469 strip_timezone: bool, default False 470 If `True`, remove the UTC `tzinfo` property. 471 472 chunksize: Optional[int], default None 473 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 474 475 dtype_backend: str, default 'numpy_nullable' 476 If `df` is not a DataFrame and new one needs to be constructed, 477 use this as the datatypes backend. 478 Accepted values are 'numpy_nullable' and 'pyarrow'. 479 480 ignore_all: bool, default False 481 If `True`, do not attempt to cast any columns to datetimes. 482 483 precision_unit: Optional[str], default None 484 If provided, enforce the given precision on the coerced datetime columns. 485 486 coerce_utc: bool, default True 487 Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`). 488 489 debug: bool, default False 490 Verbosity toggle. 491 492 Returns 493 ------- 494 A new pandas DataFrame with the determined datetime columns 495 (usually ISO strings) cast as datetimes. 496 497 Examples 498 -------- 499 ```python 500 >>> import pandas as pd 501 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 502 >>> df.dtypes 503 a object 504 dtype: object 505 >>> df2 = parse_df_datetimes(df) 506 >>> df2.dtypes 507 a datetime64[us, UTC] 508 dtype: object 509 510 ``` 511 512 """ 513 from meerschaum.utils.packages import import_pandas, attempt_import 514 from meerschaum.utils.debug import dprint 515 from meerschaum.utils.warnings import warn 516 from meerschaum.utils.misc import items_str 517 from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES 518 import traceback 519 520 pd = import_pandas() 521 pandas = attempt_import('pandas') 522 pd_name = pd.__name__ 523 using_dask = 'dask' in pd_name 524 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 525 dask_dataframe = None 526 if using_dask or df_is_dask: 527 npartitions = chunksize_to_npartitions(chunksize) 528 dask_dataframe = attempt_import('dask.dataframe') 529 530 ### if df is a dict, build DataFrame 531 if isinstance(df, pandas.DataFrame): 532 pdf = df 533 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 534 pdf = get_first_valid_dask_partition(df) 535 else: 536 if debug: 537 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 538 539 if using_dask: 540 if isinstance(df, list): 541 keys = set() 542 for doc in df: 543 for key in doc: 544 keys.add(key) 545 df = pd.DataFrame.from_dict( 546 { 547 k: [ 548 doc.get(k, None) 549 for doc in df 550 ] for k in keys 551 }, 552 npartitions=npartitions, 553 ) 554 elif isinstance(df, dict): 555 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 556 elif 'pandas.core.frame.DataFrame' in str(type(df)): 557 df = pd.from_pandas(df, npartitions=npartitions) 558 else: 559 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 560 pandas = attempt_import('pandas') 561 pdf = get_first_valid_dask_partition(df) 562 563 else: 564 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 565 pdf = df 566 567 ### skip parsing if DataFrame is empty 568 if len(pdf) == 0: 569 if debug: 570 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 571 return df 572 573 ignore_cols = set( 574 (ignore_cols or []) + [ 575 col 576 for col, dtype in pdf.dtypes.items() 577 if 'datetime' in str(dtype) 578 ] 579 ) 580 cols_to_inspect = [ 581 col 582 for col in pdf.columns 583 if col not in ignore_cols 584 ] if not ignore_all else [] 585 586 if len(cols_to_inspect) == 0: 587 if debug: 588 dprint("All columns are ignored, skipping datetime detection...") 589 return df.infer_objects().fillna(pandas.NA) 590 591 ### apply regex to columns to determine which are ISO datetimes 592 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 593 dt_mask = pdf[cols_to_inspect].astype(str).apply( 594 lambda s: s.str.match(iso_dt_regex).all() 595 ) 596 597 ### list of datetime column names 598 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 599 if not datetime_cols: 600 if debug: 601 dprint("No columns detected as datetimes, returning...") 602 return df.infer_objects().fillna(pandas.NA) 603 604 if debug: 605 dprint("Converting columns to datetimes: " + str(datetime_cols)) 606 607 def _parse_to_datetime(x): 608 return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc) 609 610 try: 611 if not using_dask: 612 df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime) 613 else: 614 df[datetime_cols] = df[datetime_cols].apply( 615 _parse_to_datetime, 616 utc=True, 617 axis=1, 618 meta={ 619 col: MRSM_PD_DTYPES['datetime'] 620 for col in datetime_cols 621 } 622 ) 623 except Exception: 624 warn( 625 f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n" 626 + f"{traceback.format_exc()}" 627 ) 628 629 if strip_timezone: 630 for dt in datetime_cols: 631 try: 632 df[dt] = df[dt].dt.tz_localize(None) 633 except Exception: 634 warn( 635 f"Unable to convert column '{dt}' to naive datetime:\n" 636 + f"{traceback.format_exc()}" 637 ) 638 639 return df.fillna(pandas.NA) 640 641 642def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 643 """ 644 Get the columns which contain unhashable objects from a Pandas DataFrame. 645 646 Parameters 647 ---------- 648 df: pd.DataFrame 649 The DataFrame which may contain unhashable objects. 650 651 Returns 652 ------- 653 A list of columns. 654 """ 655 if df is None: 656 return [] 657 if len(df) == 0: 658 return [] 659 660 is_dask = 'dask' in df.__module__ 661 if is_dask: 662 from meerschaum.utils.packages import attempt_import 663 pandas = attempt_import('pandas') 664 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 665 return [ 666 col for col, val in df.iloc[0].items() 667 if not isinstance(val, Hashable) 668 ] 669 670 671def get_json_cols(df: 'pd.DataFrame') -> List[str]: 672 """ 673 Get the columns which contain unhashable objects from a Pandas DataFrame. 674 675 Parameters 676 ---------- 677 df: pd.DataFrame 678 The DataFrame which may contain unhashable objects. 679 680 Returns 681 ------- 682 A list of columns to be encoded as JSON. 683 """ 684 if df is None: 685 return [] 686 687 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 688 if is_dask: 689 df = get_first_valid_dask_partition(df) 690 691 if len(df) == 0: 692 return [] 693 694 cols_indices = { 695 col: df[col].first_valid_index() 696 for col in df.columns 697 } 698 return [ 699 col 700 for col, ix in cols_indices.items() 701 if ( 702 ix is not None 703 and isinstance(df.loc[ix][col], (dict, list)) 704 ) 705 ] 706 707 708def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 709 """ 710 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 711 712 Parameters 713 ---------- 714 df: pd.DataFrame 715 The DataFrame which may contain decimal objects. 716 717 Returns 718 ------- 719 A list of columns to treat as numerics. 720 """ 721 if df is None: 722 return [] 723 from decimal import Decimal 724 is_dask = 'dask' in df.__module__ 725 if is_dask: 726 df = get_first_valid_dask_partition(df) 727 728 if len(df) == 0: 729 return [] 730 731 cols_indices = { 732 col: df[col].first_valid_index() 733 for col in df.columns 734 } 735 return [ 736 col 737 for col, ix in cols_indices.items() 738 if ( 739 ix is not None 740 and 741 isinstance(df.loc[ix][col], Decimal) 742 ) 743 ] 744 745 746def get_bool_cols(df: 'pd.DataFrame') -> List[str]: 747 """ 748 Get the columns which contain `bool` objects from a Pandas DataFrame. 749 750 Parameters 751 ---------- 752 df: pd.DataFrame 753 The DataFrame which may contain bools. 754 755 Returns 756 ------- 757 A list of columns to treat as bools. 758 """ 759 if df is None: 760 return [] 761 762 is_dask = 'dask' in df.__module__ 763 if is_dask: 764 df = get_first_valid_dask_partition(df) 765 766 if len(df) == 0: 767 return [] 768 769 from meerschaum.utils.dtypes import are_dtypes_equal 770 771 return [ 772 col 773 for col, typ in df.dtypes.items() 774 if are_dtypes_equal(str(typ), 'bool') 775 ] 776 777 778def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 779 """ 780 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 781 782 Parameters 783 ---------- 784 df: pd.DataFrame 785 The DataFrame which may contain UUID objects. 786 787 Returns 788 ------- 789 A list of columns to treat as UUIDs. 790 """ 791 if df is None: 792 return [] 793 from uuid import UUID 794 is_dask = 'dask' in df.__module__ 795 if is_dask: 796 df = get_first_valid_dask_partition(df) 797 798 if len(df) == 0: 799 return [] 800 801 cols_indices = { 802 col: df[col].first_valid_index() 803 for col in df.columns 804 } 805 return [ 806 col 807 for col, ix in cols_indices.items() 808 if ( 809 ix is not None 810 and 811 isinstance(df.loc[ix][col], UUID) 812 ) 813 ] 814 815 816def get_datetime_cols( 817 df: 'pd.DataFrame', 818 timezone_aware: bool = True, 819 timezone_naive: bool = True, 820 with_tz_precision: bool = False, 821) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]: 822 """ 823 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 824 825 Parameters 826 ---------- 827 df: pd.DataFrame 828 The DataFrame which may contain `datetime` or `Timestamp` objects. 829 830 timezone_aware: bool, default True 831 If `True`, include timezone-aware datetime columns. 832 833 timezone_naive: bool, default True 834 If `True`, include timezone-naive datetime columns. 835 836 with_tz_precision: bool, default False 837 If `True`, return a dictionary mapping column names to tuples in the form 838 `(timezone, precision)`. 839 840 Returns 841 ------- 842 A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples 843 (if `with_tz_precision` is `True`). 844 """ 845 if not timezone_aware and not timezone_naive: 846 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 847 848 if df is None: 849 return [] if not with_tz_precision else {} 850 851 from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES 852 is_dask = 'dask' in df.__module__ 853 if is_dask: 854 df = get_first_valid_dask_partition(df) 855 856 def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]: 857 """ 858 Extract the tz + precision tuple from a dtype string. 859 """ 860 meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '') 861 tz = ( 862 None 863 if ',' not in meta_str 864 else meta_str.split(',', maxsplit=1)[-1] 865 ) 866 precision_abbreviation = ( 867 meta_str 868 if ',' not in meta_str 869 else meta_str.split(',')[0] 870 ) 871 precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation] 872 return tz, precision 873 874 def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]: 875 """ 876 Return the tz + precision tuple from a Python datetime object. 877 """ 878 return dt.tzname(), 'microsecond' 879 880 known_dt_cols_types = { 881 col: str(typ) 882 for col, typ in df.dtypes.items() 883 if are_dtypes_equal('datetime', str(typ)) 884 } 885 886 known_dt_cols_tuples = { 887 col: get_tz_precision_from_dtype(typ) 888 for col, typ in known_dt_cols_types.items() 889 } 890 891 if len(df) == 0: 892 return ( 893 list(known_dt_cols_types) 894 if not with_tz_precision 895 else known_dt_cols_tuples 896 ) 897 898 cols_indices = { 899 col: df[col].first_valid_index() 900 for col in df.columns 901 if col not in known_dt_cols_types 902 } 903 pydt_cols_tuples = { 904 col: get_tz_precision_from_datetime(sample_val) 905 for col, ix in cols_indices.items() 906 if ( 907 ix is not None 908 and 909 isinstance((sample_val := df.loc[ix][col]), datetime) 910 ) 911 } 912 913 dt_cols_tuples = { 914 **known_dt_cols_tuples, 915 **pydt_cols_tuples 916 } 917 918 all_dt_cols_tuples = { 919 col: dt_cols_tuples[col] 920 for col in df.columns 921 if col in dt_cols_tuples 922 } 923 if timezone_aware and timezone_naive: 924 return ( 925 list(all_dt_cols_tuples) 926 if not with_tz_precision 927 else all_dt_cols_tuples 928 ) 929 930 known_timezone_aware_dt_cols = [ 931 col 932 for col in known_dt_cols_types 933 if getattr(df[col], 'tz', None) is not None 934 ] 935 timezone_aware_pydt_cols_tuples = { 936 col: (tz, precision) 937 for col, (tz, precision) in pydt_cols_tuples.items() 938 if df.loc[cols_indices[col]][col].tzinfo is not None 939 } 940 timezone_aware_dt_cols_set = set( 941 known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples) 942 ) 943 timezone_aware_cols_tuples = { 944 col: (tz, precision) 945 for col, (tz, precision) in all_dt_cols_tuples.items() 946 if col in timezone_aware_dt_cols_set 947 } 948 timezone_naive_cols_tuples = { 949 col: (tz, precision) 950 for col, (tz, precision) in all_dt_cols_tuples.items() 951 if col not in timezone_aware_dt_cols_set 952 } 953 954 if timezone_aware: 955 return ( 956 list(timezone_aware_cols_tuples) 957 if not with_tz_precision 958 else timezone_aware_cols_tuples 959 ) 960 961 return ( 962 list(timezone_naive_cols_tuples) 963 if not with_tz_precision 964 else timezone_naive_cols_tuples 965 ) 966 967 968def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 969 """ 970 Return a dictionary mapping datetime columns to specific types strings. 971 972 Parameters 973 ---------- 974 df: pd.DataFrame 975 The DataFrame which may contain datetime columns. 976 977 Returns 978 ------- 979 A dictionary mapping the datetime columns' names to dtype strings 980 (containing timezone and precision metadata). 981 982 Examples 983 -------- 984 >>> from datetime import datetime, timezone 985 >>> import pandas as pd 986 >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]}) 987 >>> get_datetime_cols_types(df) 988 {'dt_tz_aware': 'datetime64[us, UTC]'} 989 >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]}) 990 >>> get_datetime_cols_types(df) 991 {'distant_dt': 'datetime64[us]'} 992 >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)}) 993 >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]') 994 >>> get_datetime_cols_types(df) 995 {'dt_second': 'datetime64[s]'} 996 """ 997 from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS 998 dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True) 999 if not dt_cols_tuples: 1000 return {} 1001 1002 return { 1003 col: ( 1004 f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]" 1005 if tz is None 1006 else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]" 1007 ) 1008 for col, (tz, precision) in dt_cols_tuples.items() 1009 } 1010 1011 1012def get_date_cols(df: 'pd.DataFrame') -> List[str]: 1013 """ 1014 Get the `date` columns from a Pandas DataFrame. 1015 1016 Parameters 1017 ---------- 1018 df: pd.DataFrame 1019 The DataFrame which may contain dates. 1020 1021 Returns 1022 ------- 1023 A list of columns to treat as dates. 1024 """ 1025 from meerschaum.utils.dtypes import are_dtypes_equal 1026 if df is None: 1027 return [] 1028 1029 is_dask = 'dask' in df.__module__ 1030 if is_dask: 1031 df = get_first_valid_dask_partition(df) 1032 1033 known_date_cols = [ 1034 col 1035 for col, typ in df.dtypes.items() 1036 if are_dtypes_equal(typ, 'date') 1037 ] 1038 1039 if len(df) == 0: 1040 return known_date_cols 1041 1042 cols_indices = { 1043 col: df[col].first_valid_index() 1044 for col in df.columns 1045 if col not in known_date_cols 1046 } 1047 object_date_cols = [ 1048 col 1049 for col, ix in cols_indices.items() 1050 if ( 1051 ix is not None 1052 and isinstance(df.loc[ix][col], date) 1053 ) 1054 ] 1055 1056 all_date_cols = set(known_date_cols + object_date_cols) 1057 1058 return [ 1059 col 1060 for col in df.columns 1061 if col in all_date_cols 1062 ] 1063 1064 1065def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 1066 """ 1067 Get the columns which contain bytes strings from a Pandas DataFrame. 1068 1069 Parameters 1070 ---------- 1071 df: pd.DataFrame 1072 The DataFrame which may contain bytes strings. 1073 1074 Returns 1075 ------- 1076 A list of columns to treat as bytes. 1077 """ 1078 if df is None: 1079 return [] 1080 1081 is_dask = 'dask' in df.__module__ 1082 if is_dask: 1083 df = get_first_valid_dask_partition(df) 1084 1085 known_bytes_cols = [ 1086 col 1087 for col, typ in df.dtypes.items() 1088 if str(typ) == 'binary[pyarrow]' 1089 ] 1090 1091 if len(df) == 0: 1092 return known_bytes_cols 1093 1094 cols_indices = { 1095 col: df[col].first_valid_index() 1096 for col in df.columns 1097 if col not in known_bytes_cols 1098 } 1099 object_bytes_cols = [ 1100 col 1101 for col, ix in cols_indices.items() 1102 if ( 1103 ix is not None 1104 and isinstance(df.loc[ix][col], bytes) 1105 ) 1106 ] 1107 1108 all_bytes_cols = set(known_bytes_cols + object_bytes_cols) 1109 1110 return [ 1111 col 1112 for col in df.columns 1113 if col in all_bytes_cols 1114 ] 1115 1116 1117def get_geometry_cols( 1118 df: 'pd.DataFrame', 1119 with_types_srids: bool = False, 1120) -> Union[List[str], Dict[str, Any]]: 1121 """ 1122 Get the columns which contain shapely objects from a Pandas DataFrame. 1123 1124 Parameters 1125 ---------- 1126 df: pd.DataFrame 1127 The DataFrame which may contain bytes strings. 1128 1129 with_types_srids: bool, default False 1130 If `True`, return a dictionary mapping columns to geometry types and SRIDs. 1131 1132 Returns 1133 ------- 1134 A list of columns to treat as `geometry`. 1135 If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID). 1136 """ 1137 if df is None: 1138 return [] if not with_types_srids else {} 1139 1140 is_dask = 'dask' in df.__module__ 1141 if is_dask: 1142 df = get_first_valid_dask_partition(df) 1143 1144 if len(df) == 0: 1145 return [] if not with_types_srids else {} 1146 1147 cols_indices = { 1148 col: df[col].first_valid_index() 1149 for col in df.columns 1150 } 1151 geo_cols = [ 1152 col 1153 for col, ix in cols_indices.items() 1154 if ( 1155 ix is not None 1156 and 1157 'shapely' in str(type(df.loc[ix][col])) 1158 ) 1159 ] 1160 if not with_types_srids: 1161 return geo_cols 1162 1163 gpd = mrsm.attempt_import('geopandas', lazy=False) 1164 geo_cols_types_srids = {} 1165 for col in geo_cols: 1166 try: 1167 sample_geo_series = gpd.GeoSeries(df[col], crs=None) 1168 geometry_types = { 1169 geom.geom_type 1170 for geom in sample_geo_series 1171 if hasattr(geom, 'geom_type') 1172 } 1173 geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series) 1174 srid = ( 1175 ( 1176 sample_geo_series.crs.sub_crs_list[0].to_epsg() 1177 if sample_geo_series.crs.is_compound 1178 else sample_geo_series.crs.to_epsg() 1179 ) 1180 if sample_geo_series.crs 1181 else 0 1182 ) 1183 geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry' 1184 if geometry_type != 'geometry' and geometry_has_z: 1185 geometry_type = geometry_type + 'Z' 1186 except Exception: 1187 srid = 0 1188 geometry_type = 'geometry' 1189 geo_cols_types_srids[col] = (geometry_type, srid) 1190 1191 return geo_cols_types_srids 1192 1193 1194def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 1195 """ 1196 Return a dtypes dictionary mapping columns to specific geometry types (type, srid). 1197 """ 1198 geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True) 1199 new_cols_types = {} 1200 for col, (geometry_type, srid) in geometry_cols_types_srids.items(): 1201 new_dtype = "geometry" 1202 modifier = "" 1203 if not srid and geometry_type.lower() == 'geometry': 1204 new_cols_types[col] = new_dtype 1205 continue 1206 1207 modifier = "[" 1208 if geometry_type.lower() != 'geometry': 1209 modifier += f"{geometry_type}" 1210 1211 if srid: 1212 if modifier != '[': 1213 modifier += ", " 1214 modifier += f"{srid}" 1215 modifier += "]" 1216 new_cols_types[col] = f"{new_dtype}{modifier}" 1217 return new_cols_types 1218 1219 1220def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]: 1221 """ 1222 Return a dtypes dictionary mapping special columns to their dtypes. 1223 """ 1224 return { 1225 **{col: 'json' for col in get_json_cols(df)}, 1226 **{col: 'uuid' for col in get_uuid_cols(df)}, 1227 **{col: 'bytes' for col in get_bytes_cols(df)}, 1228 **{col: 'bool' for col in get_bool_cols(df)}, 1229 **{col: 'numeric' for col in get_numeric_cols(df)}, 1230 **{col: 'date' for col in get_date_cols(df)}, 1231 **get_datetime_cols_types(df), 1232 **get_geometry_cols_types(df), 1233 } 1234 1235 1236def enforce_dtypes( 1237 df: 'pd.DataFrame', 1238 dtypes: Dict[str, str], 1239 explicit_dtypes: Optional[Dict[str, str]] = None, 1240 safe_copy: bool = True, 1241 coerce_numeric: bool = False, 1242 coerce_timezone: bool = True, 1243 strip_timezone: bool = False, 1244 debug: bool = False, 1245) -> 'pd.DataFrame': 1246 """ 1247 Enforce the `dtypes` dictionary on a DataFrame. 1248 1249 Parameters 1250 ---------- 1251 df: pd.DataFrame 1252 The DataFrame on which to enforce dtypes. 1253 1254 dtypes: Dict[str, str] 1255 The data types to attempt to enforce on the DataFrame. 1256 1257 explicit_dtypes: Optional[Dict[str, str]], default None 1258 If provided, automatic dtype coersion will respect explicitly configured 1259 dtypes (`int`, `float`, `numeric`). 1260 1261 safe_copy: bool, default True 1262 If `True`, create a copy before comparing and modifying the dataframes. 1263 Setting to `False` may mutate the DataFrames. 1264 See `meerschaum.utils.dataframe.filter_unseen_df`. 1265 1266 coerce_numeric: bool, default False 1267 If `True`, convert float and int collisions to numeric. 1268 1269 coerce_timezone: bool, default True 1270 If `True`, convert datetimes to UTC. 1271 1272 strip_timezone: bool, default False 1273 If `coerce_timezone` and `strip_timezone` are `True`, 1274 remove timezone information from datetimes. 1275 1276 debug: bool, default False 1277 Verbosity toggle. 1278 1279 Returns 1280 ------- 1281 The Pandas DataFrame with the types enforced. 1282 """ 1283 import json 1284 import functools 1285 from meerschaum.utils.debug import dprint 1286 from meerschaum.utils.formatting import pprint 1287 from meerschaum.utils.dtypes import ( 1288 are_dtypes_equal, 1289 to_pandas_dtype, 1290 is_dtype_numeric, 1291 attempt_cast_to_numeric, 1292 attempt_cast_to_uuid, 1293 attempt_cast_to_bytes, 1294 attempt_cast_to_geometry, 1295 coerce_timezone as _coerce_timezone, 1296 get_geometry_type_srid, 1297 ) 1298 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 1299 pandas = mrsm.attempt_import('pandas') 1300 is_dask = 'dask' in df.__module__ 1301 if safe_copy: 1302 df = df.copy() 1303 if len(df.columns) == 0: 1304 if debug: 1305 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 1306 return df 1307 1308 explicit_dtypes = explicit_dtypes or {} 1309 pipe_pandas_dtypes = { 1310 col: to_pandas_dtype(typ) 1311 for col, typ in dtypes.items() 1312 } 1313 json_cols = [ 1314 col 1315 for col, typ in dtypes.items() 1316 if typ == 'json' 1317 ] 1318 numeric_cols = [ 1319 col 1320 for col, typ in dtypes.items() 1321 if typ.startswith('numeric') 1322 ] 1323 geometry_cols_types_srids = { 1324 col: get_geometry_type_srid(typ, default_srid=0) 1325 for col, typ in dtypes.items() 1326 if typ.startswith('geometry') or typ.startswith('geography') 1327 } 1328 uuid_cols = [ 1329 col 1330 for col, typ in dtypes.items() 1331 if typ == 'uuid' 1332 ] 1333 bytes_cols = [ 1334 col 1335 for col, typ in dtypes.items() 1336 if typ == 'bytes' 1337 ] 1338 datetime_cols = [ 1339 col 1340 for col, typ in dtypes.items() 1341 if are_dtypes_equal(typ, 'datetime') 1342 ] 1343 df_numeric_cols = get_numeric_cols(df) 1344 if debug: 1345 dprint("Desired data types:") 1346 pprint(dtypes) 1347 dprint("Data types for incoming DataFrame:") 1348 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 1349 1350 if json_cols and len(df) > 0: 1351 if debug: 1352 dprint(f"Checking columns for JSON encoding: {json_cols}") 1353 for col in json_cols: 1354 if col in df.columns: 1355 try: 1356 df[col] = df[col].apply( 1357 ( 1358 lambda x: ( 1359 json.loads(x) 1360 if isinstance(x, str) 1361 else x 1362 ) 1363 ) 1364 ) 1365 except Exception as e: 1366 if debug: 1367 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 1368 1369 if numeric_cols: 1370 if debug: 1371 dprint(f"Checking for numerics: {numeric_cols}") 1372 for col in numeric_cols: 1373 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 1374 if col in df.columns: 1375 try: 1376 df[col] = df[col].apply( 1377 functools.partial( 1378 attempt_cast_to_numeric, 1379 quantize=True, 1380 precision=precision, 1381 scale=scale, 1382 ) 1383 ) 1384 except Exception as e: 1385 if debug: 1386 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 1387 1388 if uuid_cols: 1389 if debug: 1390 dprint(f"Checking for UUIDs: {uuid_cols}") 1391 for col in uuid_cols: 1392 if col in df.columns: 1393 try: 1394 df[col] = df[col].apply(attempt_cast_to_uuid) 1395 except Exception as e: 1396 if debug: 1397 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1398 1399 if bytes_cols: 1400 if debug: 1401 dprint(f"Checking for bytes: {bytes_cols}") 1402 for col in bytes_cols: 1403 if col in df.columns: 1404 try: 1405 df[col] = df[col].apply(attempt_cast_to_bytes) 1406 except Exception as e: 1407 if debug: 1408 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1409 1410 if datetime_cols and coerce_timezone: 1411 if debug: 1412 dprint(f"Checking for datetime conversion: {datetime_cols}") 1413 for col in datetime_cols: 1414 if col in df.columns: 1415 if not strip_timezone and 'utc' in str(df.dtypes[col]).lower(): 1416 if debug: 1417 dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).") 1418 continue 1419 if strip_timezone and ',' not in str(df.dtypes[col]): 1420 if debug: 1421 dprint( 1422 f"Skip UTC coersion (stripped) for column '{col}' " 1423 f"({str(df[col].dtype)})." 1424 ) 1425 continue 1426 1427 if debug: 1428 dprint( 1429 f"Data type for column '{col}' before timezone coersion: " 1430 f"{str(df[col].dtype)}" 1431 ) 1432 1433 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1434 if debug: 1435 dprint( 1436 f"Data type for column '{col}' after timezone coersion: " 1437 f"{str(df[col].dtype)}" 1438 ) 1439 1440 if geometry_cols_types_srids: 1441 geopandas = mrsm.attempt_import('geopandas') 1442 if debug: 1443 dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}") 1444 parsed_geom_cols = [] 1445 for col in geometry_cols_types_srids: 1446 if col not in df.columns: 1447 continue 1448 try: 1449 df[col] = attempt_cast_to_geometry(df[col]) 1450 parsed_geom_cols.append(col) 1451 except Exception as e: 1452 import traceback 1453 traceback.print_exc() 1454 if debug: 1455 dprint(f"Unable to parse column '{col}' as geometry:\n{e}") 1456 1457 if parsed_geom_cols: 1458 if debug: 1459 dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...") 1460 try: 1461 _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]] 1462 df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid) 1463 for col, (_, srid) in geometry_cols_types_srids.items(): 1464 if srid: 1465 if debug: 1466 dprint(f"Setting '{col}' to SRID '{srid}'...") 1467 _ = df[col].set_crs(srid) 1468 if parsed_geom_cols[0] not in df.columns: 1469 df.rename_geometry(parsed_geom_cols[0], inplace=True) 1470 except (ValueError, TypeError): 1471 import traceback 1472 dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}") 1473 1474 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1475 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1476 if debug: 1477 dprint("Data types match. Exiting enforcement...") 1478 return df 1479 1480 common_dtypes = {} 1481 common_diff_dtypes = {} 1482 for col, typ in pipe_pandas_dtypes.items(): 1483 if col in df_dtypes: 1484 common_dtypes[col] = typ 1485 if not are_dtypes_equal(typ, df_dtypes[col]): 1486 common_diff_dtypes[col] = df_dtypes[col] 1487 1488 if debug: 1489 dprint("Common columns with different dtypes:") 1490 pprint(common_diff_dtypes) 1491 1492 detected_dt_cols = {} 1493 for col, typ in common_diff_dtypes.items(): 1494 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1495 df_dtypes[col] = typ 1496 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1497 for col in detected_dt_cols: 1498 del common_diff_dtypes[col] 1499 1500 if debug: 1501 dprint("Common columns with different dtypes (after dates):") 1502 pprint(common_diff_dtypes) 1503 1504 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1505 if debug: 1506 dprint( 1507 "The incoming DataFrame has mostly the same types, skipping enforcement." 1508 + "The only detected difference was in the following datetime columns." 1509 ) 1510 pprint(detected_dt_cols) 1511 return df 1512 1513 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1514 previous_typ = common_dtypes[col] 1515 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1516 explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float') 1517 explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int') 1518 explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric') 1519 all_nan = ( 1520 df[col].isnull().all() 1521 if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int) 1522 else None 1523 ) 1524 cast_to_numeric = explicitly_numeric or ( 1525 ( 1526 col in df_numeric_cols 1527 or ( 1528 mixed_numeric_types 1529 and not (explicitly_float or explicitly_int) 1530 and not all_nan 1531 and coerce_numeric 1532 ) 1533 ) 1534 ) 1535 1536 if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types): 1537 from meerschaum.utils.formatting import make_header 1538 msg = ( 1539 make_header(f"Coercing column '{col}' to numeric:", left_pad=0) 1540 + "\n" 1541 + f" Previous type: {previous_typ}\n" 1542 + f" Current type: {typ if col not in df_numeric_cols else 'Decimal'}" 1543 + ("\n Column is explicitly numeric." if explicitly_numeric else "") 1544 ) if cast_to_numeric else ( 1545 f"Will not coerce column '{col}' to numeric.\n" 1546 f" Numeric columns in dataframe: {df_numeric_cols}\n" 1547 f" Mixed numeric types: {mixed_numeric_types}\n" 1548 f" Explicitly float: {explicitly_float}\n" 1549 f" Explicitly int: {explicitly_int}\n" 1550 f" All NaN: {all_nan}\n" 1551 f" Coerce numeric: {coerce_numeric}" 1552 ) 1553 dprint(msg) 1554 1555 if cast_to_numeric: 1556 common_dtypes[col] = attempt_cast_to_numeric 1557 common_diff_dtypes[col] = attempt_cast_to_numeric 1558 1559 for d in common_diff_dtypes: 1560 t = common_dtypes[d] 1561 if debug: 1562 dprint(f"Casting column {d} to dtype {t}.") 1563 try: 1564 df[d] = ( 1565 df[d].apply(t) 1566 if callable(t) 1567 else df[d].astype(t) 1568 ) 1569 except Exception as e: 1570 if debug: 1571 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}") 1572 if 'int' in str(t).lower(): 1573 try: 1574 df[d] = df[d].astype('float64').astype(t) 1575 except Exception: 1576 if debug: 1577 dprint(f"Was unable to convert to float then {t}.") 1578 return df 1579 1580 1581def get_datetime_bound_from_df( 1582 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1583 datetime_column: str, 1584 minimum: bool = True, 1585) -> Union[int, datetime, None]: 1586 """ 1587 Return the minimum or maximum datetime (or integer) from a DataFrame. 1588 1589 Parameters 1590 ---------- 1591 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1592 The DataFrame, list, or dict which contains the range axis. 1593 1594 datetime_column: str 1595 The name of the datetime (or int) column. 1596 1597 minimum: bool 1598 Whether to return the minimum (default) or maximum value. 1599 1600 Returns 1601 ------- 1602 The minimum or maximum datetime value in the dataframe, or `None`. 1603 """ 1604 from meerschaum.utils.dtypes import to_datetime, value_is_null 1605 1606 if df is None: 1607 return None 1608 if not datetime_column: 1609 return None 1610 1611 def compare(a, b): 1612 if a is None: 1613 return b 1614 if b is None: 1615 return a 1616 if minimum: 1617 return a if a < b else b 1618 return a if a > b else b 1619 1620 if isinstance(df, list): 1621 if len(df) == 0: 1622 return None 1623 best_yet = df[0].get(datetime_column, None) 1624 for doc in df: 1625 val = doc.get(datetime_column, None) 1626 best_yet = compare(best_yet, val) 1627 return best_yet 1628 1629 if isinstance(df, dict): 1630 if datetime_column not in df: 1631 return None 1632 best_yet = df[datetime_column][0] 1633 for val in df[datetime_column]: 1634 best_yet = compare(best_yet, val) 1635 return best_yet 1636 1637 if 'DataFrame' in str(type(df)): 1638 from meerschaum.utils.dtypes import are_dtypes_equal 1639 pandas = mrsm.attempt_import('pandas') 1640 is_dask = 'dask' in df.__module__ 1641 1642 if datetime_column not in df.columns: 1643 return None 1644 1645 try: 1646 dt_val = ( 1647 df[datetime_column].min(skipna=True) 1648 if minimum 1649 else df[datetime_column].max(skipna=True) 1650 ) 1651 except Exception: 1652 dt_val = pandas.NA 1653 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1654 dt_val = dt_val.compute() 1655 1656 return ( 1657 to_datetime(dt_val, as_pydatetime=True) 1658 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1659 else (dt_val if not value_is_null(dt_val) else None) 1660 ) 1661 1662 return None 1663 1664 1665def get_unique_index_values( 1666 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1667 indices: List[str], 1668) -> Dict[str, List[Any]]: 1669 """ 1670 Return a dictionary of the unique index values in a DataFrame. 1671 1672 Parameters 1673 ---------- 1674 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1675 The dataframe (or list or dict) which contains index values. 1676 1677 indices: List[str] 1678 The list of index columns. 1679 1680 Returns 1681 ------- 1682 A dictionary mapping indices to unique values. 1683 """ 1684 if df is None: 1685 return {} 1686 if 'dataframe' in str(type(df)).lower(): 1687 pandas = mrsm.attempt_import('pandas') 1688 return { 1689 col: list({ 1690 (val if val is not pandas.NA else None) 1691 for val in df[col].unique() 1692 }) 1693 for col in indices 1694 if col in df.columns 1695 } 1696 1697 unique_indices = defaultdict(lambda: set()) 1698 if isinstance(df, list): 1699 for doc in df: 1700 for index in indices: 1701 if index in doc: 1702 unique_indices[index].add(doc[index]) 1703 1704 elif isinstance(df, dict): 1705 for index in indices: 1706 if index in df: 1707 unique_indices[index] = unique_indices[index].union(set(df[index])) 1708 1709 return {key: list(val) for key, val in unique_indices.items()} 1710 1711 1712def df_is_chunk_generator(df: Any) -> bool: 1713 """ 1714 Determine whether to treat `df` as a chunk generator. 1715 1716 Note this should only be used in a context where generators are expected, 1717 as it will return `True` for any iterable. 1718 1719 Parameters 1720 ---------- 1721 The DataFrame or chunk generator to evaluate. 1722 1723 Returns 1724 ------- 1725 A `bool` indicating whether to treat `df` as a generator. 1726 """ 1727 return ( 1728 not isinstance(df, (dict, list, str)) 1729 and 'DataFrame' not in str(type(df)) 1730 and isinstance(df, (Generator, Iterable, Iterator)) 1731 ) 1732 1733 1734def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1735 """ 1736 Return the Dask `npartitions` value for a given `chunksize`. 1737 """ 1738 if chunksize == -1: 1739 from meerschaum.config import get_config 1740 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1741 if chunksize is None: 1742 return 1 1743 return -1 * chunksize 1744 1745 1746def df_from_literal( 1747 pipe: Optional[mrsm.Pipe] = None, 1748 literal: Optional[str] = None, 1749 debug: bool = False 1750) -> 'pd.DataFrame': 1751 """ 1752 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1753 1754 Parameters 1755 ---------- 1756 pipe: Optional['meerschaum.Pipe'], default None 1757 The pipe which will consume the literal value. 1758 1759 Returns 1760 ------- 1761 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1762 and the literal as the value. 1763 """ 1764 from meerschaum.utils.packages import import_pandas 1765 from meerschaum.utils.warnings import error, warn 1766 from meerschaum.utils.debug import dprint 1767 from meerschaum.utils.dtypes import get_current_timestamp 1768 1769 if pipe is None or literal is None: 1770 error("Please provide a Pipe and a literal value") 1771 1772 dt_col = pipe.columns.get( 1773 'datetime', 1774 mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing') 1775 ) 1776 val_col = pipe.get_val_column(debug=debug) 1777 1778 val = literal 1779 if isinstance(literal, str): 1780 if debug: 1781 dprint(f"Received literal string: '{literal}'") 1782 import ast 1783 try: 1784 val = ast.literal_eval(literal) 1785 except Exception: 1786 warn( 1787 "Failed to parse value from string:\n" + f"{literal}" + 1788 "\n\nWill cast as a string instead."\ 1789 ) 1790 val = literal 1791 1792 now = get_current_timestamp(pipe.precision) 1793 pd = import_pandas() 1794 return pd.DataFrame({dt_col: [now], val_col: [val]}) 1795 1796 1797def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1798 """ 1799 Return the first valid Dask DataFrame partition (if possible). 1800 """ 1801 pdf = None 1802 for partition in ddf.partitions: 1803 try: 1804 pdf = partition.compute() 1805 except Exception: 1806 continue 1807 if len(pdf) > 0: 1808 return pdf 1809 _ = mrsm.attempt_import('partd', lazy=False) 1810 return ddf.compute() 1811 1812 1813def query_df( 1814 df: 'pd.DataFrame', 1815 params: Optional[Dict[str, Any]] = None, 1816 begin: Union[datetime, int, None] = None, 1817 end: Union[datetime, int, None] = None, 1818 datetime_column: Optional[str] = None, 1819 select_columns: Optional[List[str]] = None, 1820 omit_columns: Optional[List[str]] = None, 1821 inplace: bool = False, 1822 reset_index: bool = False, 1823 coerce_types: bool = False, 1824 debug: bool = False, 1825) -> 'pd.DataFrame': 1826 """ 1827 Query the dataframe with the params dictionary. 1828 1829 Parameters 1830 ---------- 1831 df: pd.DataFrame 1832 The DataFrame to query against. 1833 1834 params: Optional[Dict[str, Any]], default None 1835 The parameters dictionary to use for the query. 1836 1837 begin: Union[datetime, int, None], default None 1838 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1839 greater than or equal to this value. 1840 1841 end: Union[datetime, int, None], default None 1842 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1843 less than this value. 1844 1845 datetime_column: Optional[str], default None 1846 A `datetime_column` must be provided to use `begin` and `end`. 1847 1848 select_columns: Optional[List[str]], default None 1849 If provided, only return these columns. 1850 1851 omit_columns: Optional[List[str]], default None 1852 If provided, do not include these columns in the result. 1853 1854 inplace: bool, default False 1855 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1856 1857 reset_index: bool, default False 1858 If `True`, reset the index in the resulting DataFrame. 1859 1860 coerce_types: bool, default False 1861 If `True`, cast the dataframe and parameters as strings before querying. 1862 1863 Returns 1864 ------- 1865 A Pandas DataFrame query result. 1866 """ 1867 1868 def _process_select_columns(_df): 1869 if not select_columns: 1870 return 1871 for col in list(_df.columns): 1872 if col not in select_columns: 1873 del _df[col] 1874 1875 def _process_omit_columns(_df): 1876 if not omit_columns: 1877 return 1878 for col in list(_df.columns): 1879 if col in omit_columns: 1880 del _df[col] 1881 1882 if not params and not begin and not end: 1883 if not inplace: 1884 df = df.copy() 1885 _process_select_columns(df) 1886 _process_omit_columns(df) 1887 return df 1888 1889 from meerschaum.utils.debug import dprint 1890 from meerschaum.utils.misc import get_in_ex_params 1891 from meerschaum.utils.warnings import warn 1892 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1893 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1894 pandas = mrsm.attempt_import('pandas') 1895 NA = pandas.NA 1896 1897 if params: 1898 proto_in_ex_params = get_in_ex_params(params) 1899 for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items(): 1900 if proto_ex_vals: 1901 coerce_types = True 1902 break 1903 params = params.copy() 1904 for key, val in {k: v for k, v in params.items()}.items(): 1905 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'): 1906 if None in val: 1907 val = [item for item in val if item is not None] + [NA] 1908 params[key] = val 1909 if coerce_types: 1910 params[key] = [str(x) for x in val] 1911 else: 1912 if value_is_null(val): 1913 val = NA 1914 params[key] = NA 1915 if coerce_types: 1916 params[key] = str(val) 1917 1918 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1919 1920 if inplace: 1921 df.fillna(NA, inplace=True) 1922 else: 1923 df = df.infer_objects().fillna(NA) 1924 1925 if isinstance(begin, str): 1926 begin = dateutil_parser.parse(begin) 1927 if isinstance(end, str): 1928 end = dateutil_parser.parse(end) 1929 1930 if begin is not None or end is not None: 1931 if not datetime_column or datetime_column not in df.columns: 1932 warn( 1933 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1934 + "ignoring begin and end...", 1935 ) 1936 begin, end = None, None 1937 1938 if debug: 1939 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1940 1941 if datetime_column and (begin is not None or end is not None): 1942 if debug: 1943 dprint("Checking for datetime column compatability.") 1944 1945 from meerschaum.utils.dtypes import coerce_timezone 1946 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1947 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1948 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1949 1950 if df_is_dt: 1951 df_tz = ( 1952 getattr(df[datetime_column].dt, 'tz', None) 1953 if hasattr(df[datetime_column], 'dt') 1954 else None 1955 ) 1956 1957 if begin_is_int: 1958 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1959 if debug: 1960 dprint(f"`begin` will be cast to '{begin}'.") 1961 if end_is_int: 1962 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1963 if debug: 1964 dprint(f"`end` will be cast to '{end}'.") 1965 1966 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1967 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1968 1969 in_ex_params = get_in_ex_params(params) 1970 1971 masks = [ 1972 ( 1973 (df[datetime_column] >= begin) 1974 if begin is not None and datetime_column 1975 else True 1976 ) & ( 1977 (df[datetime_column] < end) 1978 if end is not None and datetime_column 1979 else True 1980 ) 1981 ] 1982 1983 masks.extend([ 1984 ( 1985 ( 1986 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1987 if in_vals 1988 else True 1989 ) & ( 1990 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1991 if ex_vals 1992 else True 1993 ) 1994 ) 1995 for col, (in_vals, ex_vals) in in_ex_params.items() 1996 if col in df.columns 1997 ]) 1998 query_mask = masks[0] 1999 for mask in masks[1:]: 2000 query_mask = query_mask & mask 2001 2002 original_cols = df.columns 2003 2004 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 2005 ### to allow for `<NA>` values. 2006 bool_cols = [ 2007 col 2008 for col, typ in df.dtypes.items() 2009 if are_dtypes_equal(str(typ), 'bool') 2010 ] 2011 for col in bool_cols: 2012 df[col] = df[col].astype('boolean[pyarrow]') 2013 2014 if not isinstance(query_mask, bool): 2015 df['__mrsm_mask'] = ( 2016 query_mask.astype('boolean[pyarrow]') 2017 if hasattr(query_mask, 'astype') 2018 else query_mask 2019 ) 2020 2021 if inplace: 2022 df.where(query_mask, other=NA, inplace=True) 2023 df.dropna(how='all', inplace=True) 2024 result_df = df 2025 else: 2026 result_df = df.where(query_mask, other=NA) 2027 result_df.dropna(how='all', inplace=True) 2028 2029 else: 2030 result_df = df 2031 2032 if '__mrsm_mask' in df.columns: 2033 del df['__mrsm_mask'] 2034 if '__mrsm_mask' in result_df.columns: 2035 del result_df['__mrsm_mask'] 2036 2037 if reset_index: 2038 result_df.reset_index(drop=True, inplace=True) 2039 2040 result_df = enforce_dtypes( 2041 result_df, 2042 dtypes, 2043 safe_copy=False, 2044 debug=debug, 2045 coerce_numeric=False, 2046 coerce_timezone=False, 2047 ) 2048 2049 if select_columns == ['*']: 2050 select_columns = None 2051 2052 if not select_columns and not omit_columns: 2053 return result_df[original_cols] 2054 2055 _process_select_columns(result_df) 2056 _process_omit_columns(result_df) 2057 2058 return result_df 2059 2060 2061def to_json( 2062 df: 'pd.DataFrame', 2063 safe_copy: bool = True, 2064 orient: str = 'records', 2065 date_format: str = 'iso', 2066 date_unit: str = 'us', 2067 double_precision: int = 15, 2068 geometry_format: str = 'geojson', 2069 **kwargs: Any 2070) -> str: 2071 """ 2072 Serialize the given dataframe as a JSON string. 2073 2074 Parameters 2075 ---------- 2076 df: pd.DataFrame 2077 The DataFrame to be serialized. 2078 2079 safe_copy: bool, default True 2080 If `False`, modify the DataFrame inplace. 2081 2082 date_format: str, default 'iso' 2083 The default format for timestamps. 2084 2085 date_unit: str, default 'us' 2086 The precision of the timestamps. 2087 2088 double_precision: int, default 15 2089 The number of decimal places to use when encoding floating point values (maximum 15). 2090 2091 geometry_format: str, default 'geojson' 2092 The serialization format for geometry data. 2093 Accepted values are `geojson`, `wkb_hex`, and `wkt`. 2094 2095 Returns 2096 ------- 2097 A JSON string. 2098 """ 2099 import warnings 2100 import functools 2101 from meerschaum.utils.packages import import_pandas 2102 from meerschaum.utils.dtypes import ( 2103 serialize_bytes, 2104 serialize_decimal, 2105 serialize_geometry, 2106 serialize_date, 2107 ) 2108 pd = import_pandas() 2109 uuid_cols = get_uuid_cols(df) 2110 bytes_cols = get_bytes_cols(df) 2111 numeric_cols = get_numeric_cols(df) 2112 date_cols = get_date_cols(df) 2113 geometry_cols = get_geometry_cols(df) 2114 geometry_cols_srids = { 2115 col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0') 2116 for col in geometry_cols 2117 } if 'geodataframe' in str(type(df)).lower() else {} 2118 if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols): 2119 df = df.copy() 2120 if 'geodataframe' in str(type(df)).lower(): 2121 geometry_data = { 2122 col: df[col] 2123 for col in geometry_cols 2124 } 2125 df = pd.DataFrame({ 2126 col: df[col] 2127 for col in df.columns 2128 if col not in geometry_cols 2129 }) 2130 for col in geometry_cols: 2131 df[col] = pd.Series(ob for ob in geometry_data[col]) 2132 for col in uuid_cols: 2133 df[col] = df[col].astype(str) 2134 for col in bytes_cols: 2135 df[col] = df[col].apply(serialize_bytes) 2136 for col in numeric_cols: 2137 df[col] = df[col].apply(serialize_decimal) 2138 for col in date_cols: 2139 df[col] = df[col].apply(serialize_date) 2140 with warnings.catch_warnings(): 2141 warnings.simplefilter("ignore") 2142 for col in geometry_cols: 2143 srid = geometry_cols_srids.get(col, None) or None 2144 df[col] = pd.Series( 2145 serialize_geometry(val, geometry_format=geometry_format, srid=srid) 2146 for val in df[col] 2147 ) 2148 return df.infer_objects().fillna(pd.NA).to_json( 2149 date_format=date_format, 2150 date_unit=date_unit, 2151 double_precision=double_precision, 2152 orient=orient, 2153 **kwargs 2154 ) 2155 2156 2157def to_simple_lines(df: 'pd.DataFrame') -> str: 2158 """ 2159 Serialize a Pandas Dataframe as lines of simple dictionaries. 2160 2161 Parameters 2162 ---------- 2163 df: pd.DataFrame 2164 The dataframe to serialize into simple lines text. 2165 2166 Returns 2167 ------- 2168 A string of simple line dictionaries joined by newlines. 2169 """ 2170 from meerschaum.utils.misc import to_simple_dict 2171 if df is None or len(df) == 0: 2172 return '' 2173 2174 docs = df.to_dict(orient='records') 2175 return '\n'.join(to_simple_dict(doc) for doc in docs) 2176 2177 2178def parse_simple_lines(data: str) -> 'pd.DataFrame': 2179 """ 2180 Parse simple lines text into a DataFrame. 2181 2182 Parameters 2183 ---------- 2184 data: str 2185 The simple lines text to parse into a DataFrame. 2186 2187 Returns 2188 ------- 2189 A dataframe containing the rows serialized in `data`. 2190 """ 2191 from meerschaum.utils.misc import string_to_dict 2192 from meerschaum.utils.packages import import_pandas 2193 pd = import_pandas() 2194 lines = data.splitlines() 2195 try: 2196 docs = [string_to_dict(line) for line in lines] 2197 df = pd.DataFrame(docs) 2198 except Exception: 2199 df = None 2200 2201 if df is None: 2202 raise ValueError("Cannot parse simple lines into a dataframe.") 2203 2204 return df
25def add_missing_cols_to_df( 26 df: 'pd.DataFrame', 27 dtypes: Dict[str, Any], 28) -> 'pd.DataFrame': 29 """ 30 Add columns from the dtypes dictionary as null columns to a new DataFrame. 31 32 Parameters 33 ---------- 34 df: pd.DataFrame 35 The dataframe we should copy and add null columns. 36 37 dtypes: 38 The data types dictionary which may contain keys not present in `df.columns`. 39 40 Returns 41 ------- 42 A new `DataFrame` with the keys from `dtypes` added as null columns. 43 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 44 NOTE: This will not ensure that dtypes are enforced! 45 46 Examples 47 -------- 48 >>> import pandas as pd 49 >>> df = pd.DataFrame([{'a': 1}]) 50 >>> dtypes = {'b': 'Int64'} 51 >>> add_missing_cols_to_df(df, dtypes) 52 a b 53 0 1 <NA> 54 >>> add_missing_cols_to_df(df, dtypes).dtypes 55 a int64 56 b Int64 57 dtype: object 58 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 59 a int64 60 dtype: object 61 >>> 62 """ 63 if set(df.columns) == set(dtypes): 64 return df 65 66 from meerschaum.utils.packages import attempt_import 67 from meerschaum.utils.dtypes import to_pandas_dtype 68 pandas = attempt_import('pandas') 69 70 def build_series(dtype: str): 71 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 72 73 assign_kwargs = { 74 str(col): build_series(str(typ)) 75 for col, typ in dtypes.items() 76 if col not in df.columns 77 } 78 df_with_cols = df.assign(**assign_kwargs) 79 for col in assign_kwargs: 80 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 81 return df_with_cols
Add columns from the dtypes dictionary as null columns to a new DataFrame.
Parameters
- df (pd.DataFrame): The dataframe we should copy and add null columns.
- dtypes:: The data types dictionary which may contain keys not present in
df.columns.
Returns
- A new
DataFramewith the keys fromdtypesadded as null columns. - If
df.dtypesis the same asdtypes, then return a reference todf. - NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
a b
0 1 <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a int64
b Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a int64
dtype: object
>>>
84def filter_unseen_df( 85 old_df: 'pd.DataFrame', 86 new_df: 'pd.DataFrame', 87 safe_copy: bool = True, 88 dtypes: Optional[Dict[str, Any]] = None, 89 include_unchanged_columns: bool = False, 90 coerce_mixed_numerics: bool = True, 91 debug: bool = False, 92) -> 'pd.DataFrame': 93 """ 94 Left join two DataFrames to find the newest unseen data. 95 96 Parameters 97 ---------- 98 old_df: 'pd.DataFrame' 99 The original (target) dataframe. Acts as a filter on the `new_df`. 100 101 new_df: 'pd.DataFrame' 102 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 103 104 safe_copy: bool, default True 105 If `True`, create a copy before comparing and modifying the dataframes. 106 Setting to `False` may mutate the DataFrames. 107 108 dtypes: Optional[Dict[str, Any]], default None 109 Optionally specify the datatypes of the dataframe. 110 111 include_unchanged_columns: bool, default False 112 If `True`, include columns which haven't changed on rows which have changed. 113 114 coerce_mixed_numerics: bool, default True 115 If `True`, cast mixed integer and float columns between the old and new dataframes into 116 numeric values (`decimal.Decimal`). 117 118 debug: bool, default False 119 Verbosity toggle. 120 121 Returns 122 ------- 123 A pandas dataframe of the new, unseen rows in `new_df`. 124 125 Examples 126 -------- 127 ```python 128 >>> import pandas as pd 129 >>> df1 = pd.DataFrame({'a': [1,2]}) 130 >>> df2 = pd.DataFrame({'a': [2,3]}) 131 >>> filter_unseen_df(df1, df2) 132 a 133 0 3 134 135 ``` 136 137 """ 138 if old_df is None: 139 return new_df 140 141 if safe_copy: 142 old_df = old_df.copy() 143 new_df = new_df.copy() 144 145 import json 146 import functools 147 import traceback 148 from meerschaum.utils.warnings import warn 149 from meerschaum.utils.packages import import_pandas, attempt_import 150 from meerschaum.utils.dtypes import ( 151 to_pandas_dtype, 152 are_dtypes_equal, 153 attempt_cast_to_numeric, 154 attempt_cast_to_uuid, 155 attempt_cast_to_bytes, 156 attempt_cast_to_geometry, 157 coerce_timezone, 158 serialize_decimal, 159 ) 160 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 161 pd = import_pandas(debug=debug) 162 is_dask = 'dask' in new_df.__module__ 163 if is_dask: 164 pandas = attempt_import('pandas') 165 _ = attempt_import('partd', lazy=False) 166 dd = attempt_import('dask.dataframe') 167 merge = dd.merge 168 NA = pandas.NA 169 else: 170 merge = pd.merge 171 NA = pd.NA 172 173 new_df_dtypes = dict(new_df.dtypes) 174 old_df_dtypes = dict(old_df.dtypes) 175 176 same_cols = set(new_df.columns) == set(old_df.columns) 177 if not same_cols: 178 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 179 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 180 181 new_types_missing_from_old = { 182 col: typ 183 for col, typ in new_df_dtypes.items() 184 if col not in old_df_dtypes 185 } 186 old_types_missing_from_new = { 187 col: typ 188 for col, typ in new_df_dtypes.items() 189 if col not in old_df_dtypes 190 } 191 old_df_dtypes.update(new_types_missing_from_old) 192 new_df_dtypes.update(old_types_missing_from_new) 193 194 ### Edge case: two empty lists cast to DFs. 195 elif len(new_df.columns) == 0: 196 return new_df 197 198 try: 199 ### Order matters when checking equality. 200 new_df = new_df[old_df.columns] 201 202 except Exception as e: 203 warn( 204 "Was not able to cast old columns onto new DataFrame. " + 205 f"Are both DataFrames the same shape? Error:\n{e}", 206 stacklevel=3, 207 ) 208 return new_df[list(new_df_dtypes.keys())] 209 210 ### assume the old_df knows what it's doing, even if it's technically wrong. 211 if dtypes is None: 212 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 213 214 dtypes = { 215 col: to_pandas_dtype(typ) 216 for col, typ in dtypes.items() 217 if col in new_df_dtypes and col in old_df_dtypes 218 } 219 for col, typ in new_df_dtypes.items(): 220 if col not in dtypes: 221 dtypes[col] = typ 222 223 numeric_cols_precisions_scales = { 224 col: get_numeric_precision_scale(None, typ) 225 for col, typ in dtypes.items() 226 if col and str(typ).lower().startswith('numeric') 227 } 228 229 dt_dtypes = { 230 col: typ 231 for col, typ in dtypes.items() 232 if are_dtypes_equal(typ, 'datetime') 233 } 234 non_dt_dtypes = { 235 col: typ 236 for col, typ in dtypes.items() 237 if col not in dt_dtypes 238 } 239 240 cast_non_dt_cols = True 241 try: 242 new_df = new_df.astype(non_dt_dtypes) 243 cast_non_dt_cols = False 244 except Exception as e: 245 warn( 246 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 247 ) 248 249 cast_dt_cols = True 250 try: 251 for col, typ in dt_dtypes.items(): 252 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 253 strip_utc = ( 254 _dtypes_col_dtype.startswith('datetime64') 255 and 'utc' not in _dtypes_col_dtype.lower() 256 ) 257 if col in old_df.columns: 258 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 259 if col in new_df.columns: 260 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 261 cast_dt_cols = False 262 except Exception as e: 263 warn(f"Could not cast datetime columns:\n{e}") 264 265 cast_cols = cast_dt_cols or cast_non_dt_cols 266 267 new_numeric_cols_existing = get_numeric_cols(new_df) 268 old_numeric_cols = get_numeric_cols(old_df) 269 for col, typ in {k: v for k, v in dtypes.items()}.items(): 270 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 271 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 272 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 273 new_is_numeric = col in new_numeric_cols_existing 274 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 275 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 276 old_is_numeric = col in old_numeric_cols 277 278 if ( 279 coerce_mixed_numerics 280 and 281 (new_is_float or new_is_int or new_is_numeric) 282 and 283 (old_is_float or old_is_int or old_is_numeric) 284 ): 285 dtypes[col] = attempt_cast_to_numeric 286 cast_cols = True 287 continue 288 289 ### Fallback to object if the types don't match. 290 warn( 291 f"Detected different types for '{col}' " 292 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 293 + "falling back to 'object'..." 294 ) 295 dtypes[col] = 'object' 296 cast_cols = True 297 298 if cast_cols: 299 for col, dtype in dtypes.items(): 300 if col in new_df.columns: 301 try: 302 new_df[col] = ( 303 new_df[col].astype(dtype) 304 if not callable(dtype) 305 else new_df[col].apply(dtype) 306 ) 307 except Exception as e: 308 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 309 310 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 311 new_json_cols = get_json_cols(new_df) 312 old_json_cols = get_json_cols(old_df) 313 json_cols = set(new_json_cols + old_json_cols) 314 for json_col in old_json_cols: 315 old_df[json_col] = old_df[json_col].apply(serializer) 316 for json_col in new_json_cols: 317 new_df[json_col] = new_df[json_col].apply(serializer) 318 319 new_numeric_cols = get_numeric_cols(new_df) 320 numeric_cols = set(new_numeric_cols + old_numeric_cols) 321 for numeric_col in old_numeric_cols: 322 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 323 for numeric_col in new_numeric_cols: 324 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 325 326 old_dt_cols = [ 327 col 328 for col, typ in old_df.dtypes.items() 329 if are_dtypes_equal(str(typ), 'datetime') 330 ] 331 for col in old_dt_cols: 332 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 333 strip_utc = ( 334 _dtypes_col_dtype.startswith('datetime64') 335 and 'utc' not in _dtypes_col_dtype.lower() 336 ) 337 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 338 339 new_dt_cols = [ 340 col 341 for col, typ in new_df.dtypes.items() 342 if are_dtypes_equal(str(typ), 'datetime') 343 ] 344 for col in new_dt_cols: 345 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 346 strip_utc = ( 347 _dtypes_col_dtype.startswith('datetime64') 348 and 'utc' not in _dtypes_col_dtype.lower() 349 ) 350 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 351 352 old_uuid_cols = get_uuid_cols(old_df) 353 new_uuid_cols = get_uuid_cols(new_df) 354 uuid_cols = set(new_uuid_cols + old_uuid_cols) 355 356 old_bytes_cols = get_bytes_cols(old_df) 357 new_bytes_cols = get_bytes_cols(new_df) 358 bytes_cols = set(new_bytes_cols + old_bytes_cols) 359 360 old_geometry_cols = get_geometry_cols(old_df) 361 new_geometry_cols = get_geometry_cols(new_df) 362 geometry_cols = set(new_geometry_cols + old_geometry_cols) 363 364 na_pattern = r'(?i)^(none|nan|na|nat|natz|<na>)$' 365 joined_df = merge( 366 new_df.infer_objects().replace(na_pattern, pd.NA, regex=True).fillna(NA), 367 old_df.infer_objects().replace(na_pattern, pd.NA, regex=True).fillna(NA), 368 how='left', 369 on=None, 370 indicator=True, 371 ) 372 changed_rows_mask = (joined_df['_merge'] == 'left_only') 373 new_cols = list(new_df_dtypes) 374 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 375 376 delta_json_cols = get_json_cols(delta_df) 377 for json_col in json_cols: 378 if ( 379 json_col in delta_json_cols 380 or json_col not in delta_df.columns 381 ): 382 continue 383 try: 384 delta_df[json_col] = delta_df[json_col].apply( 385 lambda x: (json.loads(x) if isinstance(x, str) else x) 386 ) 387 except Exception: 388 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 389 390 delta_numeric_cols = get_numeric_cols(delta_df) 391 for numeric_col in numeric_cols: 392 if ( 393 numeric_col in delta_numeric_cols 394 or numeric_col not in delta_df.columns 395 ): 396 continue 397 try: 398 delta_df[numeric_col] = delta_df[numeric_col].apply( 399 functools.partial( 400 attempt_cast_to_numeric, 401 quantize=True, 402 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 403 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 404 ) 405 ) 406 except Exception: 407 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 408 409 delta_uuid_cols = get_uuid_cols(delta_df) 410 for uuid_col in uuid_cols: 411 if ( 412 uuid_col in delta_uuid_cols 413 or uuid_col not in delta_df.columns 414 ): 415 continue 416 try: 417 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 418 except Exception: 419 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 420 421 delta_bytes_cols = get_bytes_cols(delta_df) 422 for bytes_col in bytes_cols: 423 if ( 424 bytes_col in delta_bytes_cols 425 or bytes_col not in delta_df.columns 426 ): 427 continue 428 try: 429 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 430 except Exception: 431 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 432 433 delta_geometry_cols = get_geometry_cols(delta_df) 434 for geometry_col in geometry_cols: 435 if ( 436 geometry_col in delta_geometry_cols 437 or geometry_col not in delta_df.columns 438 ): 439 continue 440 try: 441 delta_df[geometry_col] = attempt_cast_to_geometry(delta_df[geometry_col]) 442 except Exception: 443 warn(f"Unable to parse geometry column '{geometry_col}':\n{traceback.format_exc()}") 444 445 return delta_df
Left join two DataFrames to find the newest unseen data.
Parameters
- old_df ('pd.DataFrame'):
The original (target) dataframe. Acts as a filter on the
new_df. - new_df ('pd.DataFrame'):
The fetched (source) dataframe. Rows that are contained in
old_dfare removed. - safe_copy (bool, default True):
If
True, create a copy before comparing and modifying the dataframes. Setting toFalsemay mutate the DataFrames. - dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
- include_unchanged_columns (bool, default False):
If
True, include columns which haven't changed on rows which have changed. - coerce_mixed_numerics (bool, default True):
If
True, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal). - debug (bool, default False): Verbosity toggle.
Returns
- A pandas dataframe of the new, unseen rows in
new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
a
0 3
448def parse_df_datetimes( 449 df: 'pd.DataFrame', 450 ignore_cols: Optional[Iterable[str]] = None, 451 strip_timezone: bool = False, 452 chunksize: Optional[int] = None, 453 dtype_backend: str = 'numpy_nullable', 454 ignore_all: bool = False, 455 precision_unit: Optional[str] = None, 456 coerce_utc: bool = True, 457 debug: bool = False, 458) -> 'pd.DataFrame': 459 """ 460 Parse a pandas DataFrame for datetime columns and cast as datetimes. 461 462 Parameters 463 ---------- 464 df: pd.DataFrame 465 The pandas DataFrame to parse. 466 467 ignore_cols: Optional[Iterable[str]], default None 468 If provided, do not attempt to coerce these columns as datetimes. 469 470 strip_timezone: bool, default False 471 If `True`, remove the UTC `tzinfo` property. 472 473 chunksize: Optional[int], default None 474 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 475 476 dtype_backend: str, default 'numpy_nullable' 477 If `df` is not a DataFrame and new one needs to be constructed, 478 use this as the datatypes backend. 479 Accepted values are 'numpy_nullable' and 'pyarrow'. 480 481 ignore_all: bool, default False 482 If `True`, do not attempt to cast any columns to datetimes. 483 484 precision_unit: Optional[str], default None 485 If provided, enforce the given precision on the coerced datetime columns. 486 487 coerce_utc: bool, default True 488 Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`). 489 490 debug: bool, default False 491 Verbosity toggle. 492 493 Returns 494 ------- 495 A new pandas DataFrame with the determined datetime columns 496 (usually ISO strings) cast as datetimes. 497 498 Examples 499 -------- 500 ```python 501 >>> import pandas as pd 502 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 503 >>> df.dtypes 504 a object 505 dtype: object 506 >>> df2 = parse_df_datetimes(df) 507 >>> df2.dtypes 508 a datetime64[us, UTC] 509 dtype: object 510 511 ``` 512 513 """ 514 from meerschaum.utils.packages import import_pandas, attempt_import 515 from meerschaum.utils.debug import dprint 516 from meerschaum.utils.warnings import warn 517 from meerschaum.utils.misc import items_str 518 from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES 519 import traceback 520 521 pd = import_pandas() 522 pandas = attempt_import('pandas') 523 pd_name = pd.__name__ 524 using_dask = 'dask' in pd_name 525 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 526 dask_dataframe = None 527 if using_dask or df_is_dask: 528 npartitions = chunksize_to_npartitions(chunksize) 529 dask_dataframe = attempt_import('dask.dataframe') 530 531 ### if df is a dict, build DataFrame 532 if isinstance(df, pandas.DataFrame): 533 pdf = df 534 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 535 pdf = get_first_valid_dask_partition(df) 536 else: 537 if debug: 538 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 539 540 if using_dask: 541 if isinstance(df, list): 542 keys = set() 543 for doc in df: 544 for key in doc: 545 keys.add(key) 546 df = pd.DataFrame.from_dict( 547 { 548 k: [ 549 doc.get(k, None) 550 for doc in df 551 ] for k in keys 552 }, 553 npartitions=npartitions, 554 ) 555 elif isinstance(df, dict): 556 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 557 elif 'pandas.core.frame.DataFrame' in str(type(df)): 558 df = pd.from_pandas(df, npartitions=npartitions) 559 else: 560 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 561 pandas = attempt_import('pandas') 562 pdf = get_first_valid_dask_partition(df) 563 564 else: 565 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 566 pdf = df 567 568 ### skip parsing if DataFrame is empty 569 if len(pdf) == 0: 570 if debug: 571 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 572 return df 573 574 ignore_cols = set( 575 (ignore_cols or []) + [ 576 col 577 for col, dtype in pdf.dtypes.items() 578 if 'datetime' in str(dtype) 579 ] 580 ) 581 cols_to_inspect = [ 582 col 583 for col in pdf.columns 584 if col not in ignore_cols 585 ] if not ignore_all else [] 586 587 if len(cols_to_inspect) == 0: 588 if debug: 589 dprint("All columns are ignored, skipping datetime detection...") 590 return df.infer_objects().fillna(pandas.NA) 591 592 ### apply regex to columns to determine which are ISO datetimes 593 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 594 dt_mask = pdf[cols_to_inspect].astype(str).apply( 595 lambda s: s.str.match(iso_dt_regex).all() 596 ) 597 598 ### list of datetime column names 599 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 600 if not datetime_cols: 601 if debug: 602 dprint("No columns detected as datetimes, returning...") 603 return df.infer_objects().fillna(pandas.NA) 604 605 if debug: 606 dprint("Converting columns to datetimes: " + str(datetime_cols)) 607 608 def _parse_to_datetime(x): 609 return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc) 610 611 try: 612 if not using_dask: 613 df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime) 614 else: 615 df[datetime_cols] = df[datetime_cols].apply( 616 _parse_to_datetime, 617 utc=True, 618 axis=1, 619 meta={ 620 col: MRSM_PD_DTYPES['datetime'] 621 for col in datetime_cols 622 } 623 ) 624 except Exception: 625 warn( 626 f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n" 627 + f"{traceback.format_exc()}" 628 ) 629 630 if strip_timezone: 631 for dt in datetime_cols: 632 try: 633 df[dt] = df[dt].dt.tz_localize(None) 634 except Exception: 635 warn( 636 f"Unable to convert column '{dt}' to naive datetime:\n" 637 + f"{traceback.format_exc()}" 638 ) 639 640 return df.fillna(pandas.NA)
Parse a pandas DataFrame for datetime columns and cast as datetimes.
Parameters
- df (pd.DataFrame): The pandas DataFrame to parse.
- ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
- strip_timezone (bool, default False):
If
True, remove the UTCtzinfoproperty. - chunksize (Optional[int], default None):
If the pandas implementation is
'dask', use this chunksize for the distributed dataframe. - dtype_backend (str, default 'numpy_nullable'):
If
dfis not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'. - ignore_all (bool, default False):
If
True, do not attempt to cast any columns to datetimes. - precision_unit (Optional[str], default None): If provided, enforce the given precision on the coerced datetime columns.
- coerce_utc (bool, default True):
Coerce the datetime columns to UTC (see
meerschaum.utils.dtypes.to_datetime()). - debug (bool, default False): Verbosity toggle.
Returns
- A new pandas DataFrame with the determined datetime columns
- (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']})
>>> df.dtypes
a object
dtype: object
>>> df2 = parse_df_datetimes(df)
>>> df2.dtypes
a datetime64[us, UTC]
dtype: object
643def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 644 """ 645 Get the columns which contain unhashable objects from a Pandas DataFrame. 646 647 Parameters 648 ---------- 649 df: pd.DataFrame 650 The DataFrame which may contain unhashable objects. 651 652 Returns 653 ------- 654 A list of columns. 655 """ 656 if df is None: 657 return [] 658 if len(df) == 0: 659 return [] 660 661 is_dask = 'dask' in df.__module__ 662 if is_dask: 663 from meerschaum.utils.packages import attempt_import 664 pandas = attempt_import('pandas') 665 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 666 return [ 667 col for col, val in df.iloc[0].items() 668 if not isinstance(val, Hashable) 669 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns.
672def get_json_cols(df: 'pd.DataFrame') -> List[str]: 673 """ 674 Get the columns which contain unhashable objects from a Pandas DataFrame. 675 676 Parameters 677 ---------- 678 df: pd.DataFrame 679 The DataFrame which may contain unhashable objects. 680 681 Returns 682 ------- 683 A list of columns to be encoded as JSON. 684 """ 685 if df is None: 686 return [] 687 688 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 689 if is_dask: 690 df = get_first_valid_dask_partition(df) 691 692 if len(df) == 0: 693 return [] 694 695 cols_indices = { 696 col: df[col].first_valid_index() 697 for col in df.columns 698 } 699 return [ 700 col 701 for col, ix in cols_indices.items() 702 if ( 703 ix is not None 704 and isinstance(df.loc[ix][col], (dict, list)) 705 ) 706 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns to be encoded as JSON.
709def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 710 """ 711 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 712 713 Parameters 714 ---------- 715 df: pd.DataFrame 716 The DataFrame which may contain decimal objects. 717 718 Returns 719 ------- 720 A list of columns to treat as numerics. 721 """ 722 if df is None: 723 return [] 724 from decimal import Decimal 725 is_dask = 'dask' in df.__module__ 726 if is_dask: 727 df = get_first_valid_dask_partition(df) 728 729 if len(df) == 0: 730 return [] 731 732 cols_indices = { 733 col: df[col].first_valid_index() 734 for col in df.columns 735 } 736 return [ 737 col 738 for col, ix in cols_indices.items() 739 if ( 740 ix is not None 741 and 742 isinstance(df.loc[ix][col], Decimal) 743 ) 744 ]
Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
- A list of columns to treat as numerics.
747def get_bool_cols(df: 'pd.DataFrame') -> List[str]: 748 """ 749 Get the columns which contain `bool` objects from a Pandas DataFrame. 750 751 Parameters 752 ---------- 753 df: pd.DataFrame 754 The DataFrame which may contain bools. 755 756 Returns 757 ------- 758 A list of columns to treat as bools. 759 """ 760 if df is None: 761 return [] 762 763 is_dask = 'dask' in df.__module__ 764 if is_dask: 765 df = get_first_valid_dask_partition(df) 766 767 if len(df) == 0: 768 return [] 769 770 from meerschaum.utils.dtypes import are_dtypes_equal 771 772 return [ 773 col 774 for col, typ in df.dtypes.items() 775 if are_dtypes_equal(str(typ), 'bool') 776 ]
Get the columns which contain bool objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bools.
Returns
- A list of columns to treat as bools.
779def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 780 """ 781 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 782 783 Parameters 784 ---------- 785 df: pd.DataFrame 786 The DataFrame which may contain UUID objects. 787 788 Returns 789 ------- 790 A list of columns to treat as UUIDs. 791 """ 792 if df is None: 793 return [] 794 from uuid import UUID 795 is_dask = 'dask' in df.__module__ 796 if is_dask: 797 df = get_first_valid_dask_partition(df) 798 799 if len(df) == 0: 800 return [] 801 802 cols_indices = { 803 col: df[col].first_valid_index() 804 for col in df.columns 805 } 806 return [ 807 col 808 for col, ix in cols_indices.items() 809 if ( 810 ix is not None 811 and 812 isinstance(df.loc[ix][col], UUID) 813 ) 814 ]
Get the columns which contain uuid.UUID objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
- A list of columns to treat as UUIDs.
817def get_datetime_cols( 818 df: 'pd.DataFrame', 819 timezone_aware: bool = True, 820 timezone_naive: bool = True, 821 with_tz_precision: bool = False, 822) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]: 823 """ 824 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 825 826 Parameters 827 ---------- 828 df: pd.DataFrame 829 The DataFrame which may contain `datetime` or `Timestamp` objects. 830 831 timezone_aware: bool, default True 832 If `True`, include timezone-aware datetime columns. 833 834 timezone_naive: bool, default True 835 If `True`, include timezone-naive datetime columns. 836 837 with_tz_precision: bool, default False 838 If `True`, return a dictionary mapping column names to tuples in the form 839 `(timezone, precision)`. 840 841 Returns 842 ------- 843 A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples 844 (if `with_tz_precision` is `True`). 845 """ 846 if not timezone_aware and not timezone_naive: 847 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 848 849 if df is None: 850 return [] if not with_tz_precision else {} 851 852 from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES 853 is_dask = 'dask' in df.__module__ 854 if is_dask: 855 df = get_first_valid_dask_partition(df) 856 857 def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]: 858 """ 859 Extract the tz + precision tuple from a dtype string. 860 """ 861 meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '') 862 tz = ( 863 None 864 if ',' not in meta_str 865 else meta_str.split(',', maxsplit=1)[-1] 866 ) 867 precision_abbreviation = ( 868 meta_str 869 if ',' not in meta_str 870 else meta_str.split(',')[0] 871 ) 872 precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation] 873 return tz, precision 874 875 def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]: 876 """ 877 Return the tz + precision tuple from a Python datetime object. 878 """ 879 return dt.tzname(), 'microsecond' 880 881 known_dt_cols_types = { 882 col: str(typ) 883 for col, typ in df.dtypes.items() 884 if are_dtypes_equal('datetime', str(typ)) 885 } 886 887 known_dt_cols_tuples = { 888 col: get_tz_precision_from_dtype(typ) 889 for col, typ in known_dt_cols_types.items() 890 } 891 892 if len(df) == 0: 893 return ( 894 list(known_dt_cols_types) 895 if not with_tz_precision 896 else known_dt_cols_tuples 897 ) 898 899 cols_indices = { 900 col: df[col].first_valid_index() 901 for col in df.columns 902 if col not in known_dt_cols_types 903 } 904 pydt_cols_tuples = { 905 col: get_tz_precision_from_datetime(sample_val) 906 for col, ix in cols_indices.items() 907 if ( 908 ix is not None 909 and 910 isinstance((sample_val := df.loc[ix][col]), datetime) 911 ) 912 } 913 914 dt_cols_tuples = { 915 **known_dt_cols_tuples, 916 **pydt_cols_tuples 917 } 918 919 all_dt_cols_tuples = { 920 col: dt_cols_tuples[col] 921 for col in df.columns 922 if col in dt_cols_tuples 923 } 924 if timezone_aware and timezone_naive: 925 return ( 926 list(all_dt_cols_tuples) 927 if not with_tz_precision 928 else all_dt_cols_tuples 929 ) 930 931 known_timezone_aware_dt_cols = [ 932 col 933 for col in known_dt_cols_types 934 if getattr(df[col], 'tz', None) is not None 935 ] 936 timezone_aware_pydt_cols_tuples = { 937 col: (tz, precision) 938 for col, (tz, precision) in pydt_cols_tuples.items() 939 if df.loc[cols_indices[col]][col].tzinfo is not None 940 } 941 timezone_aware_dt_cols_set = set( 942 known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples) 943 ) 944 timezone_aware_cols_tuples = { 945 col: (tz, precision) 946 for col, (tz, precision) in all_dt_cols_tuples.items() 947 if col in timezone_aware_dt_cols_set 948 } 949 timezone_naive_cols_tuples = { 950 col: (tz, precision) 951 for col, (tz, precision) in all_dt_cols_tuples.items() 952 if col not in timezone_aware_dt_cols_set 953 } 954 955 if timezone_aware: 956 return ( 957 list(timezone_aware_cols_tuples) 958 if not with_tz_precision 959 else timezone_aware_cols_tuples 960 ) 961 962 return ( 963 list(timezone_naive_cols_tuples) 964 if not with_tz_precision 965 else timezone_naive_cols_tuples 966 )
Get the columns which contain datetime or Timestamp objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame):
The DataFrame which may contain
datetimeorTimestampobjects. - timezone_aware (bool, default True):
If
True, include timezone-aware datetime columns. - timezone_naive (bool, default True):
If
True, include timezone-naive datetime columns. - with_tz_precision (bool, default False):
If
True, return a dictionary mapping column names to tuples in the form(timezone, precision).
Returns
- A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
- (if
with_tz_precisionisTrue).
969def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 970 """ 971 Return a dictionary mapping datetime columns to specific types strings. 972 973 Parameters 974 ---------- 975 df: pd.DataFrame 976 The DataFrame which may contain datetime columns. 977 978 Returns 979 ------- 980 A dictionary mapping the datetime columns' names to dtype strings 981 (containing timezone and precision metadata). 982 983 Examples 984 -------- 985 >>> from datetime import datetime, timezone 986 >>> import pandas as pd 987 >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]}) 988 >>> get_datetime_cols_types(df) 989 {'dt_tz_aware': 'datetime64[us, UTC]'} 990 >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]}) 991 >>> get_datetime_cols_types(df) 992 {'distant_dt': 'datetime64[us]'} 993 >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)}) 994 >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]') 995 >>> get_datetime_cols_types(df) 996 {'dt_second': 'datetime64[s]'} 997 """ 998 from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS 999 dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True) 1000 if not dt_cols_tuples: 1001 return {} 1002 1003 return { 1004 col: ( 1005 f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]" 1006 if tz is None 1007 else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]" 1008 ) 1009 for col, (tz, precision) in dt_cols_tuples.items() 1010 }
Return a dictionary mapping datetime columns to specific types strings.
Parameters
- df (pd.DataFrame): The DataFrame which may contain datetime columns.
Returns
- A dictionary mapping the datetime columns' names to dtype strings
- (containing timezone and precision metadata).
Examples
>>> from datetime import datetime, timezone
>>> import pandas as pd
>>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
>>> get_datetime_cols_types(df)
{'dt_tz_aware': 'datetime64[us, UTC]'}
>>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
>>> get_datetime_cols_types(df)
{'distant_dt': 'datetime64[us]'}
>>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
>>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
>>> get_datetime_cols_types(df)
{'dt_second': 'datetime64[s]'}
1013def get_date_cols(df: 'pd.DataFrame') -> List[str]: 1014 """ 1015 Get the `date` columns from a Pandas DataFrame. 1016 1017 Parameters 1018 ---------- 1019 df: pd.DataFrame 1020 The DataFrame which may contain dates. 1021 1022 Returns 1023 ------- 1024 A list of columns to treat as dates. 1025 """ 1026 from meerschaum.utils.dtypes import are_dtypes_equal 1027 if df is None: 1028 return [] 1029 1030 is_dask = 'dask' in df.__module__ 1031 if is_dask: 1032 df = get_first_valid_dask_partition(df) 1033 1034 known_date_cols = [ 1035 col 1036 for col, typ in df.dtypes.items() 1037 if are_dtypes_equal(typ, 'date') 1038 ] 1039 1040 if len(df) == 0: 1041 return known_date_cols 1042 1043 cols_indices = { 1044 col: df[col].first_valid_index() 1045 for col in df.columns 1046 if col not in known_date_cols 1047 } 1048 object_date_cols = [ 1049 col 1050 for col, ix in cols_indices.items() 1051 if ( 1052 ix is not None 1053 and isinstance(df.loc[ix][col], date) 1054 ) 1055 ] 1056 1057 all_date_cols = set(known_date_cols + object_date_cols) 1058 1059 return [ 1060 col 1061 for col in df.columns 1062 if col in all_date_cols 1063 ]
Get the date columns from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain dates.
Returns
- A list of columns to treat as dates.
1066def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 1067 """ 1068 Get the columns which contain bytes strings from a Pandas DataFrame. 1069 1070 Parameters 1071 ---------- 1072 df: pd.DataFrame 1073 The DataFrame which may contain bytes strings. 1074 1075 Returns 1076 ------- 1077 A list of columns to treat as bytes. 1078 """ 1079 if df is None: 1080 return [] 1081 1082 is_dask = 'dask' in df.__module__ 1083 if is_dask: 1084 df = get_first_valid_dask_partition(df) 1085 1086 known_bytes_cols = [ 1087 col 1088 for col, typ in df.dtypes.items() 1089 if str(typ) == 'binary[pyarrow]' 1090 ] 1091 1092 if len(df) == 0: 1093 return known_bytes_cols 1094 1095 cols_indices = { 1096 col: df[col].first_valid_index() 1097 for col in df.columns 1098 if col not in known_bytes_cols 1099 } 1100 object_bytes_cols = [ 1101 col 1102 for col, ix in cols_indices.items() 1103 if ( 1104 ix is not None 1105 and isinstance(df.loc[ix][col], bytes) 1106 ) 1107 ] 1108 1109 all_bytes_cols = set(known_bytes_cols + object_bytes_cols) 1110 1111 return [ 1112 col 1113 for col in df.columns 1114 if col in all_bytes_cols 1115 ]
Get the columns which contain bytes strings from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
- A list of columns to treat as bytes.
1118def get_geometry_cols( 1119 df: 'pd.DataFrame', 1120 with_types_srids: bool = False, 1121) -> Union[List[str], Dict[str, Any]]: 1122 """ 1123 Get the columns which contain shapely objects from a Pandas DataFrame. 1124 1125 Parameters 1126 ---------- 1127 df: pd.DataFrame 1128 The DataFrame which may contain bytes strings. 1129 1130 with_types_srids: bool, default False 1131 If `True`, return a dictionary mapping columns to geometry types and SRIDs. 1132 1133 Returns 1134 ------- 1135 A list of columns to treat as `geometry`. 1136 If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID). 1137 """ 1138 if df is None: 1139 return [] if not with_types_srids else {} 1140 1141 is_dask = 'dask' in df.__module__ 1142 if is_dask: 1143 df = get_first_valid_dask_partition(df) 1144 1145 if len(df) == 0: 1146 return [] if not with_types_srids else {} 1147 1148 cols_indices = { 1149 col: df[col].first_valid_index() 1150 for col in df.columns 1151 } 1152 geo_cols = [ 1153 col 1154 for col, ix in cols_indices.items() 1155 if ( 1156 ix is not None 1157 and 1158 'shapely' in str(type(df.loc[ix][col])) 1159 ) 1160 ] 1161 if not with_types_srids: 1162 return geo_cols 1163 1164 gpd = mrsm.attempt_import('geopandas', lazy=False) 1165 geo_cols_types_srids = {} 1166 for col in geo_cols: 1167 try: 1168 sample_geo_series = gpd.GeoSeries(df[col], crs=None) 1169 geometry_types = { 1170 geom.geom_type 1171 for geom in sample_geo_series 1172 if hasattr(geom, 'geom_type') 1173 } 1174 geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series) 1175 srid = ( 1176 ( 1177 sample_geo_series.crs.sub_crs_list[0].to_epsg() 1178 if sample_geo_series.crs.is_compound 1179 else sample_geo_series.crs.to_epsg() 1180 ) 1181 if sample_geo_series.crs 1182 else 0 1183 ) 1184 geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry' 1185 if geometry_type != 'geometry' and geometry_has_z: 1186 geometry_type = geometry_type + 'Z' 1187 except Exception: 1188 srid = 0 1189 geometry_type = 'geometry' 1190 geo_cols_types_srids[col] = (geometry_type, srid) 1191 1192 return geo_cols_types_srids
Get the columns which contain shapely objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bytes strings.
- with_types_srids (bool, default False):
If
True, return a dictionary mapping columns to geometry types and SRIDs.
Returns
- A list of columns to treat as
geometry. - If
with_types_srids, return a dictionary mapping columns to tuples in the form (type, SRID).
1195def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 1196 """ 1197 Return a dtypes dictionary mapping columns to specific geometry types (type, srid). 1198 """ 1199 geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True) 1200 new_cols_types = {} 1201 for col, (geometry_type, srid) in geometry_cols_types_srids.items(): 1202 new_dtype = "geometry" 1203 modifier = "" 1204 if not srid and geometry_type.lower() == 'geometry': 1205 new_cols_types[col] = new_dtype 1206 continue 1207 1208 modifier = "[" 1209 if geometry_type.lower() != 'geometry': 1210 modifier += f"{geometry_type}" 1211 1212 if srid: 1213 if modifier != '[': 1214 modifier += ", " 1215 modifier += f"{srid}" 1216 modifier += "]" 1217 new_cols_types[col] = f"{new_dtype}{modifier}" 1218 return new_cols_types
Return a dtypes dictionary mapping columns to specific geometry types (type, srid).
1221def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]: 1222 """ 1223 Return a dtypes dictionary mapping special columns to their dtypes. 1224 """ 1225 return { 1226 **{col: 'json' for col in get_json_cols(df)}, 1227 **{col: 'uuid' for col in get_uuid_cols(df)}, 1228 **{col: 'bytes' for col in get_bytes_cols(df)}, 1229 **{col: 'bool' for col in get_bool_cols(df)}, 1230 **{col: 'numeric' for col in get_numeric_cols(df)}, 1231 **{col: 'date' for col in get_date_cols(df)}, 1232 **get_datetime_cols_types(df), 1233 **get_geometry_cols_types(df), 1234 }
Return a dtypes dictionary mapping special columns to their dtypes.
1237def enforce_dtypes( 1238 df: 'pd.DataFrame', 1239 dtypes: Dict[str, str], 1240 explicit_dtypes: Optional[Dict[str, str]] = None, 1241 safe_copy: bool = True, 1242 coerce_numeric: bool = False, 1243 coerce_timezone: bool = True, 1244 strip_timezone: bool = False, 1245 debug: bool = False, 1246) -> 'pd.DataFrame': 1247 """ 1248 Enforce the `dtypes` dictionary on a DataFrame. 1249 1250 Parameters 1251 ---------- 1252 df: pd.DataFrame 1253 The DataFrame on which to enforce dtypes. 1254 1255 dtypes: Dict[str, str] 1256 The data types to attempt to enforce on the DataFrame. 1257 1258 explicit_dtypes: Optional[Dict[str, str]], default None 1259 If provided, automatic dtype coersion will respect explicitly configured 1260 dtypes (`int`, `float`, `numeric`). 1261 1262 safe_copy: bool, default True 1263 If `True`, create a copy before comparing and modifying the dataframes. 1264 Setting to `False` may mutate the DataFrames. 1265 See `meerschaum.utils.dataframe.filter_unseen_df`. 1266 1267 coerce_numeric: bool, default False 1268 If `True`, convert float and int collisions to numeric. 1269 1270 coerce_timezone: bool, default True 1271 If `True`, convert datetimes to UTC. 1272 1273 strip_timezone: bool, default False 1274 If `coerce_timezone` and `strip_timezone` are `True`, 1275 remove timezone information from datetimes. 1276 1277 debug: bool, default False 1278 Verbosity toggle. 1279 1280 Returns 1281 ------- 1282 The Pandas DataFrame with the types enforced. 1283 """ 1284 import json 1285 import functools 1286 from meerschaum.utils.debug import dprint 1287 from meerschaum.utils.formatting import pprint 1288 from meerschaum.utils.dtypes import ( 1289 are_dtypes_equal, 1290 to_pandas_dtype, 1291 is_dtype_numeric, 1292 attempt_cast_to_numeric, 1293 attempt_cast_to_uuid, 1294 attempt_cast_to_bytes, 1295 attempt_cast_to_geometry, 1296 coerce_timezone as _coerce_timezone, 1297 get_geometry_type_srid, 1298 ) 1299 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 1300 pandas = mrsm.attempt_import('pandas') 1301 is_dask = 'dask' in df.__module__ 1302 if safe_copy: 1303 df = df.copy() 1304 if len(df.columns) == 0: 1305 if debug: 1306 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 1307 return df 1308 1309 explicit_dtypes = explicit_dtypes or {} 1310 pipe_pandas_dtypes = { 1311 col: to_pandas_dtype(typ) 1312 for col, typ in dtypes.items() 1313 } 1314 json_cols = [ 1315 col 1316 for col, typ in dtypes.items() 1317 if typ == 'json' 1318 ] 1319 numeric_cols = [ 1320 col 1321 for col, typ in dtypes.items() 1322 if typ.startswith('numeric') 1323 ] 1324 geometry_cols_types_srids = { 1325 col: get_geometry_type_srid(typ, default_srid=0) 1326 for col, typ in dtypes.items() 1327 if typ.startswith('geometry') or typ.startswith('geography') 1328 } 1329 uuid_cols = [ 1330 col 1331 for col, typ in dtypes.items() 1332 if typ == 'uuid' 1333 ] 1334 bytes_cols = [ 1335 col 1336 for col, typ in dtypes.items() 1337 if typ == 'bytes' 1338 ] 1339 datetime_cols = [ 1340 col 1341 for col, typ in dtypes.items() 1342 if are_dtypes_equal(typ, 'datetime') 1343 ] 1344 df_numeric_cols = get_numeric_cols(df) 1345 if debug: 1346 dprint("Desired data types:") 1347 pprint(dtypes) 1348 dprint("Data types for incoming DataFrame:") 1349 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 1350 1351 if json_cols and len(df) > 0: 1352 if debug: 1353 dprint(f"Checking columns for JSON encoding: {json_cols}") 1354 for col in json_cols: 1355 if col in df.columns: 1356 try: 1357 df[col] = df[col].apply( 1358 ( 1359 lambda x: ( 1360 json.loads(x) 1361 if isinstance(x, str) 1362 else x 1363 ) 1364 ) 1365 ) 1366 except Exception as e: 1367 if debug: 1368 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 1369 1370 if numeric_cols: 1371 if debug: 1372 dprint(f"Checking for numerics: {numeric_cols}") 1373 for col in numeric_cols: 1374 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 1375 if col in df.columns: 1376 try: 1377 df[col] = df[col].apply( 1378 functools.partial( 1379 attempt_cast_to_numeric, 1380 quantize=True, 1381 precision=precision, 1382 scale=scale, 1383 ) 1384 ) 1385 except Exception as e: 1386 if debug: 1387 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 1388 1389 if uuid_cols: 1390 if debug: 1391 dprint(f"Checking for UUIDs: {uuid_cols}") 1392 for col in uuid_cols: 1393 if col in df.columns: 1394 try: 1395 df[col] = df[col].apply(attempt_cast_to_uuid) 1396 except Exception as e: 1397 if debug: 1398 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1399 1400 if bytes_cols: 1401 if debug: 1402 dprint(f"Checking for bytes: {bytes_cols}") 1403 for col in bytes_cols: 1404 if col in df.columns: 1405 try: 1406 df[col] = df[col].apply(attempt_cast_to_bytes) 1407 except Exception as e: 1408 if debug: 1409 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1410 1411 if datetime_cols and coerce_timezone: 1412 if debug: 1413 dprint(f"Checking for datetime conversion: {datetime_cols}") 1414 for col in datetime_cols: 1415 if col in df.columns: 1416 if not strip_timezone and 'utc' in str(df.dtypes[col]).lower(): 1417 if debug: 1418 dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).") 1419 continue 1420 if strip_timezone and ',' not in str(df.dtypes[col]): 1421 if debug: 1422 dprint( 1423 f"Skip UTC coersion (stripped) for column '{col}' " 1424 f"({str(df[col].dtype)})." 1425 ) 1426 continue 1427 1428 if debug: 1429 dprint( 1430 f"Data type for column '{col}' before timezone coersion: " 1431 f"{str(df[col].dtype)}" 1432 ) 1433 1434 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1435 if debug: 1436 dprint( 1437 f"Data type for column '{col}' after timezone coersion: " 1438 f"{str(df[col].dtype)}" 1439 ) 1440 1441 if geometry_cols_types_srids: 1442 geopandas = mrsm.attempt_import('geopandas') 1443 if debug: 1444 dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}") 1445 parsed_geom_cols = [] 1446 for col in geometry_cols_types_srids: 1447 if col not in df.columns: 1448 continue 1449 try: 1450 df[col] = attempt_cast_to_geometry(df[col]) 1451 parsed_geom_cols.append(col) 1452 except Exception as e: 1453 import traceback 1454 traceback.print_exc() 1455 if debug: 1456 dprint(f"Unable to parse column '{col}' as geometry:\n{e}") 1457 1458 if parsed_geom_cols: 1459 if debug: 1460 dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...") 1461 try: 1462 _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]] 1463 df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid) 1464 for col, (_, srid) in geometry_cols_types_srids.items(): 1465 if srid: 1466 if debug: 1467 dprint(f"Setting '{col}' to SRID '{srid}'...") 1468 _ = df[col].set_crs(srid) 1469 if parsed_geom_cols[0] not in df.columns: 1470 df.rename_geometry(parsed_geom_cols[0], inplace=True) 1471 except (ValueError, TypeError): 1472 import traceback 1473 dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}") 1474 1475 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1476 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1477 if debug: 1478 dprint("Data types match. Exiting enforcement...") 1479 return df 1480 1481 common_dtypes = {} 1482 common_diff_dtypes = {} 1483 for col, typ in pipe_pandas_dtypes.items(): 1484 if col in df_dtypes: 1485 common_dtypes[col] = typ 1486 if not are_dtypes_equal(typ, df_dtypes[col]): 1487 common_diff_dtypes[col] = df_dtypes[col] 1488 1489 if debug: 1490 dprint("Common columns with different dtypes:") 1491 pprint(common_diff_dtypes) 1492 1493 detected_dt_cols = {} 1494 for col, typ in common_diff_dtypes.items(): 1495 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1496 df_dtypes[col] = typ 1497 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1498 for col in detected_dt_cols: 1499 del common_diff_dtypes[col] 1500 1501 if debug: 1502 dprint("Common columns with different dtypes (after dates):") 1503 pprint(common_diff_dtypes) 1504 1505 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1506 if debug: 1507 dprint( 1508 "The incoming DataFrame has mostly the same types, skipping enforcement." 1509 + "The only detected difference was in the following datetime columns." 1510 ) 1511 pprint(detected_dt_cols) 1512 return df 1513 1514 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1515 previous_typ = common_dtypes[col] 1516 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1517 explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float') 1518 explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int') 1519 explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric') 1520 all_nan = ( 1521 df[col].isnull().all() 1522 if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int) 1523 else None 1524 ) 1525 cast_to_numeric = explicitly_numeric or ( 1526 ( 1527 col in df_numeric_cols 1528 or ( 1529 mixed_numeric_types 1530 and not (explicitly_float or explicitly_int) 1531 and not all_nan 1532 and coerce_numeric 1533 ) 1534 ) 1535 ) 1536 1537 if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types): 1538 from meerschaum.utils.formatting import make_header 1539 msg = ( 1540 make_header(f"Coercing column '{col}' to numeric:", left_pad=0) 1541 + "\n" 1542 + f" Previous type: {previous_typ}\n" 1543 + f" Current type: {typ if col not in df_numeric_cols else 'Decimal'}" 1544 + ("\n Column is explicitly numeric." if explicitly_numeric else "") 1545 ) if cast_to_numeric else ( 1546 f"Will not coerce column '{col}' to numeric.\n" 1547 f" Numeric columns in dataframe: {df_numeric_cols}\n" 1548 f" Mixed numeric types: {mixed_numeric_types}\n" 1549 f" Explicitly float: {explicitly_float}\n" 1550 f" Explicitly int: {explicitly_int}\n" 1551 f" All NaN: {all_nan}\n" 1552 f" Coerce numeric: {coerce_numeric}" 1553 ) 1554 dprint(msg) 1555 1556 if cast_to_numeric: 1557 common_dtypes[col] = attempt_cast_to_numeric 1558 common_diff_dtypes[col] = attempt_cast_to_numeric 1559 1560 for d in common_diff_dtypes: 1561 t = common_dtypes[d] 1562 if debug: 1563 dprint(f"Casting column {d} to dtype {t}.") 1564 try: 1565 df[d] = ( 1566 df[d].apply(t) 1567 if callable(t) 1568 else df[d].astype(t) 1569 ) 1570 except Exception as e: 1571 if debug: 1572 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}") 1573 if 'int' in str(t).lower(): 1574 try: 1575 df[d] = df[d].astype('float64').astype(t) 1576 except Exception: 1577 if debug: 1578 dprint(f"Was unable to convert to float then {t}.") 1579 return df
Enforce the dtypes dictionary on a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame on which to enforce dtypes.
- dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
- explicit_dtypes (Optional[Dict[str, str]], default None):
If provided, automatic dtype coersion will respect explicitly configured
dtypes (
int,float,numeric). - safe_copy (bool, default True):
If
True, create a copy before comparing and modifying the dataframes. Setting toFalsemay mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df. - coerce_numeric (bool, default False):
If
True, convert float and int collisions to numeric. - coerce_timezone (bool, default True):
If
True, convert datetimes to UTC. - strip_timezone (bool, default False):
If
coerce_timezoneandstrip_timezoneareTrue, remove timezone information from datetimes. - debug (bool, default False): Verbosity toggle.
Returns
- The Pandas DataFrame with the types enforced.
1582def get_datetime_bound_from_df( 1583 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1584 datetime_column: str, 1585 minimum: bool = True, 1586) -> Union[int, datetime, None]: 1587 """ 1588 Return the minimum or maximum datetime (or integer) from a DataFrame. 1589 1590 Parameters 1591 ---------- 1592 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1593 The DataFrame, list, or dict which contains the range axis. 1594 1595 datetime_column: str 1596 The name of the datetime (or int) column. 1597 1598 minimum: bool 1599 Whether to return the minimum (default) or maximum value. 1600 1601 Returns 1602 ------- 1603 The minimum or maximum datetime value in the dataframe, or `None`. 1604 """ 1605 from meerschaum.utils.dtypes import to_datetime, value_is_null 1606 1607 if df is None: 1608 return None 1609 if not datetime_column: 1610 return None 1611 1612 def compare(a, b): 1613 if a is None: 1614 return b 1615 if b is None: 1616 return a 1617 if minimum: 1618 return a if a < b else b 1619 return a if a > b else b 1620 1621 if isinstance(df, list): 1622 if len(df) == 0: 1623 return None 1624 best_yet = df[0].get(datetime_column, None) 1625 for doc in df: 1626 val = doc.get(datetime_column, None) 1627 best_yet = compare(best_yet, val) 1628 return best_yet 1629 1630 if isinstance(df, dict): 1631 if datetime_column not in df: 1632 return None 1633 best_yet = df[datetime_column][0] 1634 for val in df[datetime_column]: 1635 best_yet = compare(best_yet, val) 1636 return best_yet 1637 1638 if 'DataFrame' in str(type(df)): 1639 from meerschaum.utils.dtypes import are_dtypes_equal 1640 pandas = mrsm.attempt_import('pandas') 1641 is_dask = 'dask' in df.__module__ 1642 1643 if datetime_column not in df.columns: 1644 return None 1645 1646 try: 1647 dt_val = ( 1648 df[datetime_column].min(skipna=True) 1649 if minimum 1650 else df[datetime_column].max(skipna=True) 1651 ) 1652 except Exception: 1653 dt_val = pandas.NA 1654 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1655 dt_val = dt_val.compute() 1656 1657 return ( 1658 to_datetime(dt_val, as_pydatetime=True) 1659 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1660 else (dt_val if not value_is_null(dt_val) else None) 1661 ) 1662 1663 return None
Return the minimum or maximum datetime (or integer) from a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
- datetime_column (str): The name of the datetime (or int) column.
- minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
- The minimum or maximum datetime value in the dataframe, or
None.
1666def get_unique_index_values( 1667 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1668 indices: List[str], 1669) -> Dict[str, List[Any]]: 1670 """ 1671 Return a dictionary of the unique index values in a DataFrame. 1672 1673 Parameters 1674 ---------- 1675 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1676 The dataframe (or list or dict) which contains index values. 1677 1678 indices: List[str] 1679 The list of index columns. 1680 1681 Returns 1682 ------- 1683 A dictionary mapping indices to unique values. 1684 """ 1685 if df is None: 1686 return {} 1687 if 'dataframe' in str(type(df)).lower(): 1688 pandas = mrsm.attempt_import('pandas') 1689 return { 1690 col: list({ 1691 (val if val is not pandas.NA else None) 1692 for val in df[col].unique() 1693 }) 1694 for col in indices 1695 if col in df.columns 1696 } 1697 1698 unique_indices = defaultdict(lambda: set()) 1699 if isinstance(df, list): 1700 for doc in df: 1701 for index in indices: 1702 if index in doc: 1703 unique_indices[index].add(doc[index]) 1704 1705 elif isinstance(df, dict): 1706 for index in indices: 1707 if index in df: 1708 unique_indices[index] = unique_indices[index].union(set(df[index])) 1709 1710 return {key: list(val) for key, val in unique_indices.items()}
Return a dictionary of the unique index values in a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
- indices (List[str]): The list of index columns.
Returns
- A dictionary mapping indices to unique values.
1713def df_is_chunk_generator(df: Any) -> bool: 1714 """ 1715 Determine whether to treat `df` as a chunk generator. 1716 1717 Note this should only be used in a context where generators are expected, 1718 as it will return `True` for any iterable. 1719 1720 Parameters 1721 ---------- 1722 The DataFrame or chunk generator to evaluate. 1723 1724 Returns 1725 ------- 1726 A `bool` indicating whether to treat `df` as a generator. 1727 """ 1728 return ( 1729 not isinstance(df, (dict, list, str)) 1730 and 'DataFrame' not in str(type(df)) 1731 and isinstance(df, (Generator, Iterable, Iterator)) 1732 )
Determine whether to treat df as a chunk generator.
Note this should only be used in a context where generators are expected,
as it will return True for any iterable.
Parameters
- The DataFrame or chunk generator to evaluate.
Returns
- A
boolindicating whether to treatdfas a generator.
1735def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1736 """ 1737 Return the Dask `npartitions` value for a given `chunksize`. 1738 """ 1739 if chunksize == -1: 1740 from meerschaum.config import get_config 1741 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1742 if chunksize is None: 1743 return 1 1744 return -1 * chunksize
Return the Dask npartitions value for a given chunksize.
1747def df_from_literal( 1748 pipe: Optional[mrsm.Pipe] = None, 1749 literal: Optional[str] = None, 1750 debug: bool = False 1751) -> 'pd.DataFrame': 1752 """ 1753 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1754 1755 Parameters 1756 ---------- 1757 pipe: Optional['meerschaum.Pipe'], default None 1758 The pipe which will consume the literal value. 1759 1760 Returns 1761 ------- 1762 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1763 and the literal as the value. 1764 """ 1765 from meerschaum.utils.packages import import_pandas 1766 from meerschaum.utils.warnings import error, warn 1767 from meerschaum.utils.debug import dprint 1768 from meerschaum.utils.dtypes import get_current_timestamp 1769 1770 if pipe is None or literal is None: 1771 error("Please provide a Pipe and a literal value") 1772 1773 dt_col = pipe.columns.get( 1774 'datetime', 1775 mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing') 1776 ) 1777 val_col = pipe.get_val_column(debug=debug) 1778 1779 val = literal 1780 if isinstance(literal, str): 1781 if debug: 1782 dprint(f"Received literal string: '{literal}'") 1783 import ast 1784 try: 1785 val = ast.literal_eval(literal) 1786 except Exception: 1787 warn( 1788 "Failed to parse value from string:\n" + f"{literal}" + 1789 "\n\nWill cast as a string instead."\ 1790 ) 1791 val = literal 1792 1793 now = get_current_timestamp(pipe.precision) 1794 pd = import_pandas() 1795 return pd.DataFrame({dt_col: [now], val_col: [val]})
Construct a dataframe from a literal value, using the pipe's datetime and value column names.
Parameters
- pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
- A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
- and the literal as the value.
1798def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1799 """ 1800 Return the first valid Dask DataFrame partition (if possible). 1801 """ 1802 pdf = None 1803 for partition in ddf.partitions: 1804 try: 1805 pdf = partition.compute() 1806 except Exception: 1807 continue 1808 if len(pdf) > 0: 1809 return pdf 1810 _ = mrsm.attempt_import('partd', lazy=False) 1811 return ddf.compute()
Return the first valid Dask DataFrame partition (if possible).
1814def query_df( 1815 df: 'pd.DataFrame', 1816 params: Optional[Dict[str, Any]] = None, 1817 begin: Union[datetime, int, None] = None, 1818 end: Union[datetime, int, None] = None, 1819 datetime_column: Optional[str] = None, 1820 select_columns: Optional[List[str]] = None, 1821 omit_columns: Optional[List[str]] = None, 1822 inplace: bool = False, 1823 reset_index: bool = False, 1824 coerce_types: bool = False, 1825 debug: bool = False, 1826) -> 'pd.DataFrame': 1827 """ 1828 Query the dataframe with the params dictionary. 1829 1830 Parameters 1831 ---------- 1832 df: pd.DataFrame 1833 The DataFrame to query against. 1834 1835 params: Optional[Dict[str, Any]], default None 1836 The parameters dictionary to use for the query. 1837 1838 begin: Union[datetime, int, None], default None 1839 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1840 greater than or equal to this value. 1841 1842 end: Union[datetime, int, None], default None 1843 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1844 less than this value. 1845 1846 datetime_column: Optional[str], default None 1847 A `datetime_column` must be provided to use `begin` and `end`. 1848 1849 select_columns: Optional[List[str]], default None 1850 If provided, only return these columns. 1851 1852 omit_columns: Optional[List[str]], default None 1853 If provided, do not include these columns in the result. 1854 1855 inplace: bool, default False 1856 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1857 1858 reset_index: bool, default False 1859 If `True`, reset the index in the resulting DataFrame. 1860 1861 coerce_types: bool, default False 1862 If `True`, cast the dataframe and parameters as strings before querying. 1863 1864 Returns 1865 ------- 1866 A Pandas DataFrame query result. 1867 """ 1868 1869 def _process_select_columns(_df): 1870 if not select_columns: 1871 return 1872 for col in list(_df.columns): 1873 if col not in select_columns: 1874 del _df[col] 1875 1876 def _process_omit_columns(_df): 1877 if not omit_columns: 1878 return 1879 for col in list(_df.columns): 1880 if col in omit_columns: 1881 del _df[col] 1882 1883 if not params and not begin and not end: 1884 if not inplace: 1885 df = df.copy() 1886 _process_select_columns(df) 1887 _process_omit_columns(df) 1888 return df 1889 1890 from meerschaum.utils.debug import dprint 1891 from meerschaum.utils.misc import get_in_ex_params 1892 from meerschaum.utils.warnings import warn 1893 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1894 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1895 pandas = mrsm.attempt_import('pandas') 1896 NA = pandas.NA 1897 1898 if params: 1899 proto_in_ex_params = get_in_ex_params(params) 1900 for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items(): 1901 if proto_ex_vals: 1902 coerce_types = True 1903 break 1904 params = params.copy() 1905 for key, val in {k: v for k, v in params.items()}.items(): 1906 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'): 1907 if None in val: 1908 val = [item for item in val if item is not None] + [NA] 1909 params[key] = val 1910 if coerce_types: 1911 params[key] = [str(x) for x in val] 1912 else: 1913 if value_is_null(val): 1914 val = NA 1915 params[key] = NA 1916 if coerce_types: 1917 params[key] = str(val) 1918 1919 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1920 1921 if inplace: 1922 df.fillna(NA, inplace=True) 1923 else: 1924 df = df.infer_objects().fillna(NA) 1925 1926 if isinstance(begin, str): 1927 begin = dateutil_parser.parse(begin) 1928 if isinstance(end, str): 1929 end = dateutil_parser.parse(end) 1930 1931 if begin is not None or end is not None: 1932 if not datetime_column or datetime_column not in df.columns: 1933 warn( 1934 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1935 + "ignoring begin and end...", 1936 ) 1937 begin, end = None, None 1938 1939 if debug: 1940 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1941 1942 if datetime_column and (begin is not None or end is not None): 1943 if debug: 1944 dprint("Checking for datetime column compatability.") 1945 1946 from meerschaum.utils.dtypes import coerce_timezone 1947 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1948 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1949 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1950 1951 if df_is_dt: 1952 df_tz = ( 1953 getattr(df[datetime_column].dt, 'tz', None) 1954 if hasattr(df[datetime_column], 'dt') 1955 else None 1956 ) 1957 1958 if begin_is_int: 1959 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1960 if debug: 1961 dprint(f"`begin` will be cast to '{begin}'.") 1962 if end_is_int: 1963 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1964 if debug: 1965 dprint(f"`end` will be cast to '{end}'.") 1966 1967 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1968 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1969 1970 in_ex_params = get_in_ex_params(params) 1971 1972 masks = [ 1973 ( 1974 (df[datetime_column] >= begin) 1975 if begin is not None and datetime_column 1976 else True 1977 ) & ( 1978 (df[datetime_column] < end) 1979 if end is not None and datetime_column 1980 else True 1981 ) 1982 ] 1983 1984 masks.extend([ 1985 ( 1986 ( 1987 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1988 if in_vals 1989 else True 1990 ) & ( 1991 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1992 if ex_vals 1993 else True 1994 ) 1995 ) 1996 for col, (in_vals, ex_vals) in in_ex_params.items() 1997 if col in df.columns 1998 ]) 1999 query_mask = masks[0] 2000 for mask in masks[1:]: 2001 query_mask = query_mask & mask 2002 2003 original_cols = df.columns 2004 2005 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 2006 ### to allow for `<NA>` values. 2007 bool_cols = [ 2008 col 2009 for col, typ in df.dtypes.items() 2010 if are_dtypes_equal(str(typ), 'bool') 2011 ] 2012 for col in bool_cols: 2013 df[col] = df[col].astype('boolean[pyarrow]') 2014 2015 if not isinstance(query_mask, bool): 2016 df['__mrsm_mask'] = ( 2017 query_mask.astype('boolean[pyarrow]') 2018 if hasattr(query_mask, 'astype') 2019 else query_mask 2020 ) 2021 2022 if inplace: 2023 df.where(query_mask, other=NA, inplace=True) 2024 df.dropna(how='all', inplace=True) 2025 result_df = df 2026 else: 2027 result_df = df.where(query_mask, other=NA) 2028 result_df.dropna(how='all', inplace=True) 2029 2030 else: 2031 result_df = df 2032 2033 if '__mrsm_mask' in df.columns: 2034 del df['__mrsm_mask'] 2035 if '__mrsm_mask' in result_df.columns: 2036 del result_df['__mrsm_mask'] 2037 2038 if reset_index: 2039 result_df.reset_index(drop=True, inplace=True) 2040 2041 result_df = enforce_dtypes( 2042 result_df, 2043 dtypes, 2044 safe_copy=False, 2045 debug=debug, 2046 coerce_numeric=False, 2047 coerce_timezone=False, 2048 ) 2049 2050 if select_columns == ['*']: 2051 select_columns = None 2052 2053 if not select_columns and not omit_columns: 2054 return result_df[original_cols] 2055 2056 _process_select_columns(result_df) 2057 _process_omit_columns(result_df) 2058 2059 return result_df
Query the dataframe with the params dictionary.
Parameters
- df (pd.DataFrame): The DataFrame to query against.
- params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
- begin (Union[datetime, int, None], default None):
If
beginanddatetime_columnare provided, only return rows with a timestamp greater than or equal to this value. - end (Union[datetime, int, None], default None):
If
beginanddatetime_columnare provided, only return rows with a timestamp less than this value. - datetime_column (Optional[str], default None):
A
datetime_columnmust be provided to usebeginandend. - select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
- inplace (bool, default False):
If
True, modify the DataFrame inplace rather than creating a new DataFrame. - reset_index (bool, default False):
If
True, reset the index in the resulting DataFrame. - coerce_types (bool, default False):
If
True, cast the dataframe and parameters as strings before querying.
Returns
- A Pandas DataFrame query result.
2062def to_json( 2063 df: 'pd.DataFrame', 2064 safe_copy: bool = True, 2065 orient: str = 'records', 2066 date_format: str = 'iso', 2067 date_unit: str = 'us', 2068 double_precision: int = 15, 2069 geometry_format: str = 'geojson', 2070 **kwargs: Any 2071) -> str: 2072 """ 2073 Serialize the given dataframe as a JSON string. 2074 2075 Parameters 2076 ---------- 2077 df: pd.DataFrame 2078 The DataFrame to be serialized. 2079 2080 safe_copy: bool, default True 2081 If `False`, modify the DataFrame inplace. 2082 2083 date_format: str, default 'iso' 2084 The default format for timestamps. 2085 2086 date_unit: str, default 'us' 2087 The precision of the timestamps. 2088 2089 double_precision: int, default 15 2090 The number of decimal places to use when encoding floating point values (maximum 15). 2091 2092 geometry_format: str, default 'geojson' 2093 The serialization format for geometry data. 2094 Accepted values are `geojson`, `wkb_hex`, and `wkt`. 2095 2096 Returns 2097 ------- 2098 A JSON string. 2099 """ 2100 import warnings 2101 import functools 2102 from meerschaum.utils.packages import import_pandas 2103 from meerschaum.utils.dtypes import ( 2104 serialize_bytes, 2105 serialize_decimal, 2106 serialize_geometry, 2107 serialize_date, 2108 ) 2109 pd = import_pandas() 2110 uuid_cols = get_uuid_cols(df) 2111 bytes_cols = get_bytes_cols(df) 2112 numeric_cols = get_numeric_cols(df) 2113 date_cols = get_date_cols(df) 2114 geometry_cols = get_geometry_cols(df) 2115 geometry_cols_srids = { 2116 col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0') 2117 for col in geometry_cols 2118 } if 'geodataframe' in str(type(df)).lower() else {} 2119 if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols): 2120 df = df.copy() 2121 if 'geodataframe' in str(type(df)).lower(): 2122 geometry_data = { 2123 col: df[col] 2124 for col in geometry_cols 2125 } 2126 df = pd.DataFrame({ 2127 col: df[col] 2128 for col in df.columns 2129 if col not in geometry_cols 2130 }) 2131 for col in geometry_cols: 2132 df[col] = pd.Series(ob for ob in geometry_data[col]) 2133 for col in uuid_cols: 2134 df[col] = df[col].astype(str) 2135 for col in bytes_cols: 2136 df[col] = df[col].apply(serialize_bytes) 2137 for col in numeric_cols: 2138 df[col] = df[col].apply(serialize_decimal) 2139 for col in date_cols: 2140 df[col] = df[col].apply(serialize_date) 2141 with warnings.catch_warnings(): 2142 warnings.simplefilter("ignore") 2143 for col in geometry_cols: 2144 srid = geometry_cols_srids.get(col, None) or None 2145 df[col] = pd.Series( 2146 serialize_geometry(val, geometry_format=geometry_format, srid=srid) 2147 for val in df[col] 2148 ) 2149 return df.infer_objects().fillna(pd.NA).to_json( 2150 date_format=date_format, 2151 date_unit=date_unit, 2152 double_precision=double_precision, 2153 orient=orient, 2154 **kwargs 2155 )
Serialize the given dataframe as a JSON string.
Parameters
- df (pd.DataFrame): The DataFrame to be serialized.
- safe_copy (bool, default True):
If
False, modify the DataFrame inplace. - date_format (str, default 'iso'): The default format for timestamps.
- date_unit (str, default 'us'): The precision of the timestamps.
- double_precision (int, default 15): The number of decimal places to use when encoding floating point values (maximum 15).
- geometry_format (str, default 'geojson'):
The serialization format for geometry data.
Accepted values are
geojson,wkb_hex, andwkt.
Returns
- A JSON string.
2158def to_simple_lines(df: 'pd.DataFrame') -> str: 2159 """ 2160 Serialize a Pandas Dataframe as lines of simple dictionaries. 2161 2162 Parameters 2163 ---------- 2164 df: pd.DataFrame 2165 The dataframe to serialize into simple lines text. 2166 2167 Returns 2168 ------- 2169 A string of simple line dictionaries joined by newlines. 2170 """ 2171 from meerschaum.utils.misc import to_simple_dict 2172 if df is None or len(df) == 0: 2173 return '' 2174 2175 docs = df.to_dict(orient='records') 2176 return '\n'.join(to_simple_dict(doc) for doc in docs)
Serialize a Pandas Dataframe as lines of simple dictionaries.
Parameters
- df (pd.DataFrame): The dataframe to serialize into simple lines text.
Returns
- A string of simple line dictionaries joined by newlines.
2179def parse_simple_lines(data: str) -> 'pd.DataFrame': 2180 """ 2181 Parse simple lines text into a DataFrame. 2182 2183 Parameters 2184 ---------- 2185 data: str 2186 The simple lines text to parse into a DataFrame. 2187 2188 Returns 2189 ------- 2190 A dataframe containing the rows serialized in `data`. 2191 """ 2192 from meerschaum.utils.misc import string_to_dict 2193 from meerschaum.utils.packages import import_pandas 2194 pd = import_pandas() 2195 lines = data.splitlines() 2196 try: 2197 docs = [string_to_dict(line) for line in lines] 2198 df = pd.DataFrame(docs) 2199 except Exception: 2200 df = None 2201 2202 if df is None: 2203 raise ValueError("Cannot parse simple lines into a dataframe.") 2204 2205 return df
Parse simple lines text into a DataFrame.
Parameters
- data (str): The simple lines text to parse into a DataFrame.
Returns
- A dataframe containing the rows serialized in
data.