meerschaum.utils.dataframe
Utility functions for working with DataFrames.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with DataFrames. 7""" 8 9from __future__ import annotations 10 11from datetime import datetime, timezone, date 12from collections import defaultdict 13 14import meerschaum as mrsm 15from meerschaum.utils.typing import ( 16 Optional, Dict, Any, List, Hashable, Generator, 17 Iterator, Iterable, Union, TYPE_CHECKING, Tuple, 18) 19 20if TYPE_CHECKING: 21 pd, dask = mrsm.attempt_import('pandas', 'dask') 22 23 24def add_missing_cols_to_df( 25 df: 'pd.DataFrame', 26 dtypes: Dict[str, Any], 27) -> 'pd.DataFrame': 28 """ 29 Add columns from the dtypes dictionary as null columns to a new DataFrame. 30 31 Parameters 32 ---------- 33 df: pd.DataFrame 34 The dataframe we should copy and add null columns. 35 36 dtypes: 37 The data types dictionary which may contain keys not present in `df.columns`. 38 39 Returns 40 ------- 41 A new `DataFrame` with the keys from `dtypes` added as null columns. 42 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 43 NOTE: This will not ensure that dtypes are enforced! 44 45 Examples 46 -------- 47 >>> import pandas as pd 48 >>> df = pd.DataFrame([{'a': 1}]) 49 >>> dtypes = {'b': 'Int64'} 50 >>> add_missing_cols_to_df(df, dtypes) 51 a b 52 0 1 <NA> 53 >>> add_missing_cols_to_df(df, dtypes).dtypes 54 a int64 55 b Int64 56 dtype: object 57 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 58 a int64 59 dtype: object 60 >>> 61 """ 62 if set(df.columns) == set(dtypes): 63 return df 64 65 from meerschaum.utils.packages import attempt_import 66 from meerschaum.utils.dtypes import to_pandas_dtype 67 pandas = attempt_import('pandas') 68 69 def build_series(dtype: str): 70 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 71 72 assign_kwargs = { 73 str(col): build_series(str(typ)) 74 for col, typ in dtypes.items() 75 if col not in df.columns 76 } 77 df_with_cols = df.assign(**assign_kwargs) 78 for col in assign_kwargs: 79 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 80 return df_with_cols 81 82 83def filter_unseen_df( 84 old_df: 'pd.DataFrame', 85 new_df: 'pd.DataFrame', 86 safe_copy: bool = True, 87 dtypes: Optional[Dict[str, Any]] = None, 88 include_unchanged_columns: bool = False, 89 coerce_mixed_numerics: bool = True, 90 debug: bool = False, 91) -> 'pd.DataFrame': 92 """ 93 Left join two DataFrames to find the newest unseen data. 94 95 Parameters 96 ---------- 97 old_df: 'pd.DataFrame' 98 The original (target) dataframe. Acts as a filter on the `new_df`. 99 100 new_df: 'pd.DataFrame' 101 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 102 103 safe_copy: bool, default True 104 If `True`, create a copy before comparing and modifying the dataframes. 105 Setting to `False` may mutate the DataFrames. 106 107 dtypes: Optional[Dict[str, Any]], default None 108 Optionally specify the datatypes of the dataframe. 109 110 include_unchanged_columns: bool, default False 111 If `True`, include columns which haven't changed on rows which have changed. 112 113 coerce_mixed_numerics: bool, default True 114 If `True`, cast mixed integer and float columns between the old and new dataframes into 115 numeric values (`decimal.Decimal`). 116 117 debug: bool, default False 118 Verbosity toggle. 119 120 Returns 121 ------- 122 A pandas dataframe of the new, unseen rows in `new_df`. 123 124 Examples 125 -------- 126 ```python 127 >>> import pandas as pd 128 >>> df1 = pd.DataFrame({'a': [1,2]}) 129 >>> df2 = pd.DataFrame({'a': [2,3]}) 130 >>> filter_unseen_df(df1, df2) 131 a 132 0 3 133 134 ``` 135 136 """ 137 if old_df is None: 138 return new_df 139 140 if safe_copy: 141 old_df = old_df.copy() 142 new_df = new_df.copy() 143 144 import json 145 import functools 146 import traceback 147 from meerschaum.utils.warnings import warn 148 from meerschaum.utils.packages import import_pandas, attempt_import 149 from meerschaum.utils.dtypes import ( 150 to_pandas_dtype, 151 are_dtypes_equal, 152 attempt_cast_to_numeric, 153 attempt_cast_to_uuid, 154 attempt_cast_to_bytes, 155 attempt_cast_to_geometry, 156 coerce_timezone, 157 serialize_decimal, 158 ) 159 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 160 pd = import_pandas(debug=debug) 161 is_dask = 'dask' in new_df.__module__ 162 if is_dask: 163 pandas = attempt_import('pandas') 164 _ = attempt_import('partd', lazy=False) 165 dd = attempt_import('dask.dataframe') 166 merge = dd.merge 167 NA = pandas.NA 168 else: 169 merge = pd.merge 170 NA = pd.NA 171 172 new_df_dtypes = dict(new_df.dtypes) 173 old_df_dtypes = dict(old_df.dtypes) 174 175 same_cols = set(new_df.columns) == set(old_df.columns) 176 if not same_cols: 177 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 178 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 179 180 new_types_missing_from_old = { 181 col: typ 182 for col, typ in new_df_dtypes.items() 183 if col not in old_df_dtypes 184 } 185 old_types_missing_from_new = { 186 col: typ 187 for col, typ in new_df_dtypes.items() 188 if col not in old_df_dtypes 189 } 190 old_df_dtypes.update(new_types_missing_from_old) 191 new_df_dtypes.update(old_types_missing_from_new) 192 193 ### Edge case: two empty lists cast to DFs. 194 elif len(new_df.columns) == 0: 195 return new_df 196 197 try: 198 ### Order matters when checking equality. 199 new_df = new_df[old_df.columns] 200 201 except Exception as e: 202 warn( 203 "Was not able to cast old columns onto new DataFrame. " + 204 f"Are both DataFrames the same shape? Error:\n{e}", 205 stacklevel=3, 206 ) 207 return new_df[list(new_df_dtypes.keys())] 208 209 ### assume the old_df knows what it's doing, even if it's technically wrong. 210 if dtypes is None: 211 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 212 213 dtypes = { 214 col: to_pandas_dtype(typ) 215 for col, typ in dtypes.items() 216 if col in new_df_dtypes and col in old_df_dtypes 217 } 218 for col, typ in new_df_dtypes.items(): 219 if col not in dtypes: 220 dtypes[col] = typ 221 222 numeric_cols_precisions_scales = { 223 col: get_numeric_precision_scale(None, typ) 224 for col, typ in dtypes.items() 225 if col and str(typ).lower().startswith('numeric') 226 } 227 228 dt_dtypes = { 229 col: typ 230 for col, typ in dtypes.items() 231 if are_dtypes_equal(typ, 'datetime') 232 } 233 non_dt_dtypes = { 234 col: typ 235 for col, typ in dtypes.items() 236 if col not in dt_dtypes 237 } 238 239 cast_non_dt_cols = True 240 try: 241 new_df = new_df.astype(non_dt_dtypes) 242 cast_non_dt_cols = False 243 except Exception as e: 244 warn( 245 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 246 ) 247 248 cast_dt_cols = True 249 try: 250 for col, typ in dt_dtypes.items(): 251 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 252 strip_utc = ( 253 _dtypes_col_dtype.startswith('datetime64') 254 and 'utc' not in _dtypes_col_dtype.lower() 255 ) 256 if col in old_df.columns: 257 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 258 if col in new_df.columns: 259 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 260 cast_dt_cols = False 261 except Exception as e: 262 warn(f"Could not cast datetime columns:\n{e}") 263 264 cast_cols = cast_dt_cols or cast_non_dt_cols 265 266 new_numeric_cols_existing = get_numeric_cols(new_df) 267 old_numeric_cols = get_numeric_cols(old_df) 268 for col, typ in {k: v for k, v in dtypes.items()}.items(): 269 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 270 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 271 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 272 new_is_numeric = col in new_numeric_cols_existing 273 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 274 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 275 old_is_numeric = col in old_numeric_cols 276 277 if ( 278 coerce_mixed_numerics 279 and 280 (new_is_float or new_is_int or new_is_numeric) 281 and 282 (old_is_float or old_is_int or old_is_numeric) 283 ): 284 dtypes[col] = attempt_cast_to_numeric 285 cast_cols = True 286 continue 287 288 ### Fallback to object if the types don't match. 289 warn( 290 f"Detected different types for '{col}' " 291 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 292 + "falling back to 'object'..." 293 ) 294 dtypes[col] = 'object' 295 cast_cols = True 296 297 if cast_cols: 298 for col, dtype in dtypes.items(): 299 if col in new_df.columns: 300 try: 301 new_df[col] = ( 302 new_df[col].astype(dtype) 303 if not callable(dtype) 304 else new_df[col].apply(dtype) 305 ) 306 except Exception as e: 307 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 308 309 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 310 new_json_cols = get_json_cols(new_df) 311 old_json_cols = get_json_cols(old_df) 312 json_cols = set(new_json_cols + old_json_cols) 313 for json_col in old_json_cols: 314 old_df[json_col] = old_df[json_col].apply(serializer) 315 for json_col in new_json_cols: 316 new_df[json_col] = new_df[json_col].apply(serializer) 317 318 new_numeric_cols = get_numeric_cols(new_df) 319 numeric_cols = set(new_numeric_cols + old_numeric_cols) 320 for numeric_col in old_numeric_cols: 321 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 322 for numeric_col in new_numeric_cols: 323 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 324 325 old_dt_cols = [ 326 col 327 for col, typ in old_df.dtypes.items() 328 if are_dtypes_equal(str(typ), 'datetime') 329 ] 330 for col in old_dt_cols: 331 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 332 strip_utc = ( 333 _dtypes_col_dtype.startswith('datetime64') 334 and 'utc' not in _dtypes_col_dtype.lower() 335 ) 336 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 337 338 new_dt_cols = [ 339 col 340 for col, typ in new_df.dtypes.items() 341 if are_dtypes_equal(str(typ), 'datetime') 342 ] 343 for col in new_dt_cols: 344 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 345 strip_utc = ( 346 _dtypes_col_dtype.startswith('datetime64') 347 and 'utc' not in _dtypes_col_dtype.lower() 348 ) 349 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 350 351 old_uuid_cols = get_uuid_cols(old_df) 352 new_uuid_cols = get_uuid_cols(new_df) 353 uuid_cols = set(new_uuid_cols + old_uuid_cols) 354 355 old_bytes_cols = get_bytes_cols(old_df) 356 new_bytes_cols = get_bytes_cols(new_df) 357 bytes_cols = set(new_bytes_cols + old_bytes_cols) 358 359 old_geometry_cols = get_geometry_cols(old_df) 360 new_geometry_cols = get_geometry_cols(new_df) 361 geometry_cols = set(new_geometry_cols + old_geometry_cols) 362 363 na_pattern = r'(?i)^(none|nan|na|nat|natz|<na>)$' 364 joined_df = merge( 365 new_df.infer_objects(copy=False).replace(na_pattern, pd.NA, regex=True).fillna(NA), 366 old_df.infer_objects(copy=False).replace(na_pattern, pd.NA, regex=True).fillna(NA), 367 how='left', 368 on=None, 369 indicator=True, 370 ) 371 changed_rows_mask = (joined_df['_merge'] == 'left_only') 372 new_cols = list(new_df_dtypes) 373 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 374 375 delta_json_cols = get_json_cols(delta_df) 376 for json_col in json_cols: 377 if ( 378 json_col in delta_json_cols 379 or json_col not in delta_df.columns 380 ): 381 continue 382 try: 383 delta_df[json_col] = delta_df[json_col].apply( 384 lambda x: (json.loads(x) if isinstance(x, str) else x) 385 ) 386 except Exception: 387 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 388 389 delta_numeric_cols = get_numeric_cols(delta_df) 390 for numeric_col in numeric_cols: 391 if ( 392 numeric_col in delta_numeric_cols 393 or numeric_col not in delta_df.columns 394 ): 395 continue 396 try: 397 delta_df[numeric_col] = delta_df[numeric_col].apply( 398 functools.partial( 399 attempt_cast_to_numeric, 400 quantize=True, 401 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 402 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 403 ) 404 ) 405 except Exception: 406 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 407 408 delta_uuid_cols = get_uuid_cols(delta_df) 409 for uuid_col in uuid_cols: 410 if ( 411 uuid_col in delta_uuid_cols 412 or uuid_col not in delta_df.columns 413 ): 414 continue 415 try: 416 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 417 except Exception: 418 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 419 420 delta_bytes_cols = get_bytes_cols(delta_df) 421 for bytes_col in bytes_cols: 422 if ( 423 bytes_col in delta_bytes_cols 424 or bytes_col not in delta_df.columns 425 ): 426 continue 427 try: 428 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 429 except Exception: 430 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 431 432 delta_geometry_cols = get_geometry_cols(delta_df) 433 for geometry_col in geometry_cols: 434 if ( 435 geometry_col in delta_geometry_cols 436 or geometry_col not in delta_df.columns 437 ): 438 continue 439 try: 440 delta_df[geometry_col] = attempt_cast_to_geometry(delta_df[geometry_col]) 441 except Exception: 442 warn(f"Unable to parse geometry column '{geometry_col}':\n{traceback.format_exc()}") 443 444 return delta_df 445 446 447def parse_df_datetimes( 448 df: 'pd.DataFrame', 449 ignore_cols: Optional[Iterable[str]] = None, 450 strip_timezone: bool = False, 451 chunksize: Optional[int] = None, 452 dtype_backend: str = 'numpy_nullable', 453 ignore_all: bool = False, 454 precision_unit: Optional[str] = None, 455 coerce_utc: bool = True, 456 debug: bool = False, 457) -> 'pd.DataFrame': 458 """ 459 Parse a pandas DataFrame for datetime columns and cast as datetimes. 460 461 Parameters 462 ---------- 463 df: pd.DataFrame 464 The pandas DataFrame to parse. 465 466 ignore_cols: Optional[Iterable[str]], default None 467 If provided, do not attempt to coerce these columns as datetimes. 468 469 strip_timezone: bool, default False 470 If `True`, remove the UTC `tzinfo` property. 471 472 chunksize: Optional[int], default None 473 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 474 475 dtype_backend: str, default 'numpy_nullable' 476 If `df` is not a DataFrame and new one needs to be constructed, 477 use this as the datatypes backend. 478 Accepted values are 'numpy_nullable' and 'pyarrow'. 479 480 ignore_all: bool, default False 481 If `True`, do not attempt to cast any columns to datetimes. 482 483 precision_unit: Optional[str], default None 484 If provided, enforce the given precision on the coerced datetime columns. 485 486 coerce_utc: bool, default True 487 Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`). 488 489 debug: bool, default False 490 Verbosity toggle. 491 492 Returns 493 ------- 494 A new pandas DataFrame with the determined datetime columns 495 (usually ISO strings) cast as datetimes. 496 497 Examples 498 -------- 499 ```python 500 >>> import pandas as pd 501 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 502 >>> df.dtypes 503 a object 504 dtype: object 505 >>> df2 = parse_df_datetimes(df) 506 >>> df2.dtypes 507 a datetime64[us, UTC] 508 dtype: object 509 510 ``` 511 512 """ 513 from meerschaum.utils.packages import import_pandas, attempt_import 514 from meerschaum.utils.debug import dprint 515 from meerschaum.utils.warnings import warn 516 from meerschaum.utils.misc import items_str 517 from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES 518 import traceback 519 520 pd = import_pandas() 521 pandas = attempt_import('pandas') 522 pd_name = pd.__name__ 523 using_dask = 'dask' in pd_name 524 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 525 dask_dataframe = None 526 if using_dask or df_is_dask: 527 npartitions = chunksize_to_npartitions(chunksize) 528 dask_dataframe = attempt_import('dask.dataframe') 529 530 ### if df is a dict, build DataFrame 531 if isinstance(df, pandas.DataFrame): 532 pdf = df 533 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 534 pdf = get_first_valid_dask_partition(df) 535 else: 536 if debug: 537 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 538 539 if using_dask: 540 if isinstance(df, list): 541 keys = set() 542 for doc in df: 543 for key in doc: 544 keys.add(key) 545 df = pd.DataFrame.from_dict( 546 { 547 k: [ 548 doc.get(k, None) 549 for doc in df 550 ] for k in keys 551 }, 552 npartitions=npartitions, 553 ) 554 elif isinstance(df, dict): 555 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 556 elif 'pandas.core.frame.DataFrame' in str(type(df)): 557 df = pd.from_pandas(df, npartitions=npartitions) 558 else: 559 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 560 pandas = attempt_import('pandas') 561 pdf = get_first_valid_dask_partition(df) 562 563 else: 564 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 565 pdf = df 566 567 ### skip parsing if DataFrame is empty 568 if len(pdf) == 0: 569 if debug: 570 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 571 return df 572 573 ignore_cols = set( 574 (ignore_cols or []) + [ 575 col 576 for col, dtype in pdf.dtypes.items() 577 if 'datetime' in str(dtype) 578 ] 579 ) 580 cols_to_inspect = [ 581 col 582 for col in pdf.columns 583 if col not in ignore_cols 584 ] if not ignore_all else [] 585 586 if len(cols_to_inspect) == 0: 587 if debug: 588 dprint("All columns are ignored, skipping datetime detection...") 589 return df.infer_objects(copy=False).fillna(pandas.NA) 590 591 ### apply regex to columns to determine which are ISO datetimes 592 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 593 dt_mask = pdf[cols_to_inspect].astype(str).apply( 594 lambda s: s.str.match(iso_dt_regex).all() 595 ) 596 597 ### list of datetime column names 598 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 599 if not datetime_cols: 600 if debug: 601 dprint("No columns detected as datetimes, returning...") 602 return df.infer_objects(copy=False).fillna(pandas.NA) 603 604 if debug: 605 dprint("Converting columns to datetimes: " + str(datetime_cols)) 606 607 def _parse_to_datetime(x): 608 return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc) 609 610 try: 611 if not using_dask: 612 df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime) 613 else: 614 df[datetime_cols] = df[datetime_cols].apply( 615 _parse_to_datetime, 616 utc=True, 617 axis=1, 618 meta={ 619 col: MRSM_PD_DTYPES['datetime'] 620 for col in datetime_cols 621 } 622 ) 623 except Exception: 624 warn( 625 f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n" 626 + f"{traceback.format_exc()}" 627 ) 628 629 if strip_timezone: 630 for dt in datetime_cols: 631 try: 632 df[dt] = df[dt].dt.tz_localize(None) 633 except Exception: 634 warn( 635 f"Unable to convert column '{dt}' to naive datetime:\n" 636 + f"{traceback.format_exc()}" 637 ) 638 639 return df.fillna(pandas.NA) 640 641 642def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 643 """ 644 Get the columns which contain unhashable objects from a Pandas DataFrame. 645 646 Parameters 647 ---------- 648 df: pd.DataFrame 649 The DataFrame which may contain unhashable objects. 650 651 Returns 652 ------- 653 A list of columns. 654 """ 655 if df is None: 656 return [] 657 if len(df) == 0: 658 return [] 659 660 is_dask = 'dask' in df.__module__ 661 if is_dask: 662 from meerschaum.utils.packages import attempt_import 663 pandas = attempt_import('pandas') 664 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 665 return [ 666 col for col, val in df.iloc[0].items() 667 if not isinstance(val, Hashable) 668 ] 669 670 671def get_json_cols(df: 'pd.DataFrame') -> List[str]: 672 """ 673 Get the columns which contain unhashable objects from a Pandas DataFrame. 674 675 Parameters 676 ---------- 677 df: pd.DataFrame 678 The DataFrame which may contain unhashable objects. 679 680 Returns 681 ------- 682 A list of columns to be encoded as JSON. 683 """ 684 if df is None: 685 return [] 686 687 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 688 if is_dask: 689 df = get_first_valid_dask_partition(df) 690 691 if len(df) == 0: 692 return [] 693 694 cols_indices = { 695 col: df[col].first_valid_index() 696 for col in df.columns 697 } 698 return [ 699 col 700 for col, ix in cols_indices.items() 701 if ( 702 ix is not None 703 and isinstance(df.loc[ix][col], (dict, list)) 704 ) 705 ] 706 707 708def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 709 """ 710 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 711 712 Parameters 713 ---------- 714 df: pd.DataFrame 715 The DataFrame which may contain decimal objects. 716 717 Returns 718 ------- 719 A list of columns to treat as numerics. 720 """ 721 if df is None: 722 return [] 723 from decimal import Decimal 724 is_dask = 'dask' in df.__module__ 725 if is_dask: 726 df = get_first_valid_dask_partition(df) 727 728 if len(df) == 0: 729 return [] 730 731 cols_indices = { 732 col: df[col].first_valid_index() 733 for col in df.columns 734 } 735 return [ 736 col 737 for col, ix in cols_indices.items() 738 if ( 739 ix is not None 740 and 741 isinstance(df.loc[ix][col], Decimal) 742 ) 743 ] 744 745 746def get_bool_cols(df: 'pd.DataFrame') -> List[str]: 747 """ 748 Get the columns which contain `bool` objects from a Pandas DataFrame. 749 750 Parameters 751 ---------- 752 df: pd.DataFrame 753 The DataFrame which may contain bools. 754 755 Returns 756 ------- 757 A list of columns to treat as bools. 758 """ 759 if df is None: 760 return [] 761 762 is_dask = 'dask' in df.__module__ 763 if is_dask: 764 df = get_first_valid_dask_partition(df) 765 766 if len(df) == 0: 767 return [] 768 769 from meerschaum.utils.dtypes import are_dtypes_equal 770 771 return [ 772 col 773 for col, typ in df.dtypes.items() 774 if are_dtypes_equal(str(typ), 'bool') 775 ] 776 777 778def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 779 """ 780 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 781 782 Parameters 783 ---------- 784 df: pd.DataFrame 785 The DataFrame which may contain UUID objects. 786 787 Returns 788 ------- 789 A list of columns to treat as UUIDs. 790 """ 791 if df is None: 792 return [] 793 from uuid import UUID 794 is_dask = 'dask' in df.__module__ 795 if is_dask: 796 df = get_first_valid_dask_partition(df) 797 798 if len(df) == 0: 799 return [] 800 801 cols_indices = { 802 col: df[col].first_valid_index() 803 for col in df.columns 804 } 805 return [ 806 col 807 for col, ix in cols_indices.items() 808 if ( 809 ix is not None 810 and 811 isinstance(df.loc[ix][col], UUID) 812 ) 813 ] 814 815 816def get_datetime_cols( 817 df: 'pd.DataFrame', 818 timezone_aware: bool = True, 819 timezone_naive: bool = True, 820 with_tz_precision: bool = False, 821) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]: 822 """ 823 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 824 825 Parameters 826 ---------- 827 df: pd.DataFrame 828 The DataFrame which may contain `datetime` or `Timestamp` objects. 829 830 timezone_aware: bool, default True 831 If `True`, include timezone-aware datetime columns. 832 833 timezone_naive: bool, default True 834 If `True`, include timezone-naive datetime columns. 835 836 with_tz_precision: bool, default False 837 If `True`, return a dictionary mapping column names to tuples in the form 838 `(timezone, precision)`. 839 840 Returns 841 ------- 842 A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples 843 (if `with_tz_precision` is `True`). 844 """ 845 if not timezone_aware and not timezone_naive: 846 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 847 848 if df is None: 849 return [] if not with_tz_precision else {} 850 851 from datetime import datetime 852 from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES 853 is_dask = 'dask' in df.__module__ 854 if is_dask: 855 df = get_first_valid_dask_partition(df) 856 857 def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]: 858 """ 859 Extract the tz + precision tuple from a dtype string. 860 """ 861 meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '') 862 tz = ( 863 None 864 if ',' not in meta_str 865 else meta_str.split(',', maxsplit=1)[-1] 866 ) 867 precision_abbreviation = ( 868 meta_str 869 if ',' not in meta_str 870 else meta_str.split(',')[0] 871 ) 872 precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation] 873 return tz, precision 874 875 def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]: 876 """ 877 Return the tz + precision tuple from a Python datetime object. 878 """ 879 return dt.tzname(), 'microsecond' 880 881 known_dt_cols_types = { 882 col: str(typ) 883 for col, typ in df.dtypes.items() 884 if are_dtypes_equal('datetime', str(typ)) 885 } 886 887 known_dt_cols_tuples = { 888 col: get_tz_precision_from_dtype(typ) 889 for col, typ in known_dt_cols_types.items() 890 } 891 892 if len(df) == 0: 893 return ( 894 list(known_dt_cols_types) 895 if not with_tz_precision 896 else known_dt_cols_tuples 897 ) 898 899 cols_indices = { 900 col: df[col].first_valid_index() 901 for col in df.columns 902 if col not in known_dt_cols_types 903 } 904 pydt_cols_tuples = { 905 col: get_tz_precision_from_datetime(sample_val) 906 for col, ix in cols_indices.items() 907 if ( 908 ix is not None 909 and 910 isinstance((sample_val := df.loc[ix][col]), datetime) 911 ) 912 } 913 914 dt_cols_tuples = { 915 **known_dt_cols_tuples, 916 **pydt_cols_tuples 917 } 918 919 all_dt_cols_tuples = { 920 col: dt_cols_tuples[col] 921 for col in df.columns 922 if col in dt_cols_tuples 923 } 924 if timezone_aware and timezone_naive: 925 return ( 926 list(all_dt_cols_tuples) 927 if not with_tz_precision 928 else all_dt_cols_tuples 929 ) 930 931 known_timezone_aware_dt_cols = [ 932 col 933 for col in known_dt_cols_types 934 if getattr(df[col], 'tz', None) is not None 935 ] 936 timezone_aware_pydt_cols_tuples = { 937 col: (tz, precision) 938 for col, (tz, precision) in pydt_cols_tuples.items() 939 if df.loc[cols_indices[col]][col].tzinfo is not None 940 } 941 timezone_aware_dt_cols_set = set( 942 known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples) 943 ) 944 timezone_aware_cols_tuples = { 945 col: (tz, precision) 946 for col, (tz, precision) in all_dt_cols_tuples.items() 947 if col in timezone_aware_dt_cols_set 948 } 949 timezone_naive_cols_tuples = { 950 col: (tz, precision) 951 for col, (tz, precision) in all_dt_cols_tuples.items() 952 if col not in timezone_aware_dt_cols_set 953 } 954 955 if timezone_aware: 956 return ( 957 list(timezone_aware_cols_tuples) 958 if not with_tz_precision 959 else timezone_aware_cols_tuples 960 ) 961 962 return ( 963 list(timezone_naive_cols_tuples) 964 if not with_tz_precision 965 else timezone_naive_cols_tuples 966 ) 967 968 969def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 970 """ 971 Return a dictionary mapping datetime columns to specific types strings. 972 973 Parameters 974 ---------- 975 df: pd.DataFrame 976 The DataFrame which may contain datetime columns. 977 978 Returns 979 ------- 980 A dictionary mapping the datetime columns' names to dtype strings 981 (containing timezone and precision metadata). 982 983 Examples 984 -------- 985 >>> from datetime import datetime, timezone 986 >>> import pandas as pd 987 >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]}) 988 >>> get_datetime_cols_types(df) 989 {'dt_tz_aware': 'datetime64[us, UTC]'} 990 >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]}) 991 >>> get_datetime_cols_types(df) 992 {'distant_dt': 'datetime64[us]'} 993 >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)}) 994 >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]') 995 >>> get_datetime_cols_types(df) 996 {'dt_second': 'datetime64[s]'} 997 """ 998 from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS 999 dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True) 1000 if not dt_cols_tuples: 1001 return {} 1002 1003 return { 1004 col: ( 1005 f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]" 1006 if tz is None 1007 else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]" 1008 ) 1009 for col, (tz, precision) in dt_cols_tuples.items() 1010 } 1011 1012 1013def get_date_cols(df: 'pd.DataFrame') -> List[str]: 1014 """ 1015 Get the `date` columns from a Pandas DataFrame. 1016 1017 Parameters 1018 ---------- 1019 df: pd.DataFrame 1020 The DataFrame which may contain dates. 1021 1022 Returns 1023 ------- 1024 A list of columns to treat as dates. 1025 """ 1026 from meerschaum.utils.dtypes import are_dtypes_equal 1027 if df is None: 1028 return [] 1029 1030 is_dask = 'dask' in df.__module__ 1031 if is_dask: 1032 df = get_first_valid_dask_partition(df) 1033 1034 known_date_cols = [ 1035 col 1036 for col, typ in df.dtypes.items() 1037 if are_dtypes_equal(typ, 'date') 1038 ] 1039 1040 if len(df) == 0: 1041 return known_date_cols 1042 1043 cols_indices = { 1044 col: df[col].first_valid_index() 1045 for col in df.columns 1046 if col not in known_date_cols 1047 } 1048 object_date_cols = [ 1049 col 1050 for col, ix in cols_indices.items() 1051 if ( 1052 ix is not None 1053 and isinstance(df.loc[ix][col], date) 1054 ) 1055 ] 1056 1057 all_date_cols = set(known_date_cols + object_date_cols) 1058 1059 return [ 1060 col 1061 for col in df.columns 1062 if col in all_date_cols 1063 ] 1064 1065 1066def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 1067 """ 1068 Get the columns which contain bytes strings from a Pandas DataFrame. 1069 1070 Parameters 1071 ---------- 1072 df: pd.DataFrame 1073 The DataFrame which may contain bytes strings. 1074 1075 Returns 1076 ------- 1077 A list of columns to treat as bytes. 1078 """ 1079 if df is None: 1080 return [] 1081 1082 is_dask = 'dask' in df.__module__ 1083 if is_dask: 1084 df = get_first_valid_dask_partition(df) 1085 1086 known_bytes_cols = [ 1087 col 1088 for col, typ in df.dtypes.items() 1089 if str(typ) == 'binary[pyarrow]' 1090 ] 1091 1092 if len(df) == 0: 1093 return known_bytes_cols 1094 1095 cols_indices = { 1096 col: df[col].first_valid_index() 1097 for col in df.columns 1098 if col not in known_bytes_cols 1099 } 1100 object_bytes_cols = [ 1101 col 1102 for col, ix in cols_indices.items() 1103 if ( 1104 ix is not None 1105 and isinstance(df.loc[ix][col], bytes) 1106 ) 1107 ] 1108 1109 all_bytes_cols = set(known_bytes_cols + object_bytes_cols) 1110 1111 return [ 1112 col 1113 for col in df.columns 1114 if col in all_bytes_cols 1115 ] 1116 1117 1118def get_geometry_cols( 1119 df: 'pd.DataFrame', 1120 with_types_srids: bool = False, 1121) -> Union[List[str], Dict[str, Any]]: 1122 """ 1123 Get the columns which contain shapely objects from a Pandas DataFrame. 1124 1125 Parameters 1126 ---------- 1127 df: pd.DataFrame 1128 The DataFrame which may contain bytes strings. 1129 1130 with_types_srids: bool, default False 1131 If `True`, return a dictionary mapping columns to geometry types and SRIDs. 1132 1133 Returns 1134 ------- 1135 A list of columns to treat as `geometry`. 1136 If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID). 1137 """ 1138 if df is None: 1139 return [] if not with_types_srids else {} 1140 1141 is_dask = 'dask' in df.__module__ 1142 if is_dask: 1143 df = get_first_valid_dask_partition(df) 1144 1145 if len(df) == 0: 1146 return [] if not with_types_srids else {} 1147 1148 cols_indices = { 1149 col: df[col].first_valid_index() 1150 for col in df.columns 1151 } 1152 geo_cols = [ 1153 col 1154 for col, ix in cols_indices.items() 1155 if ( 1156 ix is not None 1157 and 1158 'shapely' in str(type(df.loc[ix][col])) 1159 ) 1160 ] 1161 if not with_types_srids: 1162 return geo_cols 1163 1164 gpd = mrsm.attempt_import('geopandas', lazy=False) 1165 geo_cols_types_srids = {} 1166 for col in geo_cols: 1167 try: 1168 sample_geo_series = gpd.GeoSeries(df[col], crs=None) 1169 geometry_types = { 1170 geom.geom_type 1171 for geom in sample_geo_series 1172 if hasattr(geom, 'geom_type') 1173 } 1174 geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series) 1175 srid = ( 1176 ( 1177 sample_geo_series.crs.sub_crs_list[0].to_epsg() 1178 if sample_geo_series.crs.is_compound 1179 else sample_geo_series.crs.to_epsg() 1180 ) 1181 if sample_geo_series.crs 1182 else 0 1183 ) 1184 geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry' 1185 if geometry_type != 'geometry' and geometry_has_z: 1186 geometry_type = geometry_type + 'Z' 1187 except Exception: 1188 srid = 0 1189 geometry_type = 'geometry' 1190 geo_cols_types_srids[col] = (geometry_type, srid) 1191 1192 return geo_cols_types_srids 1193 1194 1195def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 1196 """ 1197 Return a dtypes dictionary mapping columns to specific geometry types (type, srid). 1198 """ 1199 geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True) 1200 new_cols_types = {} 1201 for col, (geometry_type, srid) in geometry_cols_types_srids.items(): 1202 new_dtype = "geometry" 1203 modifier = "" 1204 if not srid and geometry_type.lower() == 'geometry': 1205 new_cols_types[col] = new_dtype 1206 continue 1207 1208 modifier = "[" 1209 if geometry_type.lower() != 'geometry': 1210 modifier += f"{geometry_type}" 1211 1212 if srid: 1213 if modifier != '[': 1214 modifier += ", " 1215 modifier += f"{srid}" 1216 modifier += "]" 1217 new_cols_types[col] = f"{new_dtype}{modifier}" 1218 return new_cols_types 1219 1220 1221def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]: 1222 """ 1223 Return a dtypes dictionary mapping special columns to their dtypes. 1224 """ 1225 return { 1226 **{col: 'json' for col in get_json_cols(df)}, 1227 **{col: 'uuid' for col in get_uuid_cols(df)}, 1228 **{col: 'bytes' for col in get_bytes_cols(df)}, 1229 **{col: 'bool' for col in get_bool_cols(df)}, 1230 **{col: 'numeric' for col in get_numeric_cols(df)}, 1231 **{col: 'date' for col in get_date_cols(df)}, 1232 **get_datetime_cols_types(df), 1233 **get_geometry_cols_types(df), 1234 } 1235 1236 1237def enforce_dtypes( 1238 df: 'pd.DataFrame', 1239 dtypes: Dict[str, str], 1240 explicit_dtypes: Optional[Dict[str, str]] = None, 1241 safe_copy: bool = True, 1242 coerce_numeric: bool = False, 1243 coerce_timezone: bool = True, 1244 strip_timezone: bool = False, 1245 debug: bool = False, 1246) -> 'pd.DataFrame': 1247 """ 1248 Enforce the `dtypes` dictionary on a DataFrame. 1249 1250 Parameters 1251 ---------- 1252 df: pd.DataFrame 1253 The DataFrame on which to enforce dtypes. 1254 1255 dtypes: Dict[str, str] 1256 The data types to attempt to enforce on the DataFrame. 1257 1258 explicit_dtypes: Optional[Dict[str, str]], default None 1259 If provided, automatic dtype coersion will respect explicitly configured 1260 dtypes (`int`, `float`, `numeric`). 1261 1262 safe_copy: bool, default True 1263 If `True`, create a copy before comparing and modifying the dataframes. 1264 Setting to `False` may mutate the DataFrames. 1265 See `meerschaum.utils.dataframe.filter_unseen_df`. 1266 1267 coerce_numeric: bool, default False 1268 If `True`, convert float and int collisions to numeric. 1269 1270 coerce_timezone: bool, default True 1271 If `True`, convert datetimes to UTC. 1272 1273 strip_timezone: bool, default False 1274 If `coerce_timezone` and `strip_timezone` are `True`, 1275 remove timezone information from datetimes. 1276 1277 debug: bool, default False 1278 Verbosity toggle. 1279 1280 Returns 1281 ------- 1282 The Pandas DataFrame with the types enforced. 1283 """ 1284 import json 1285 import functools 1286 from meerschaum.utils.debug import dprint 1287 from meerschaum.utils.formatting import pprint 1288 from meerschaum.utils.dtypes import ( 1289 are_dtypes_equal, 1290 to_pandas_dtype, 1291 is_dtype_numeric, 1292 attempt_cast_to_numeric, 1293 attempt_cast_to_uuid, 1294 attempt_cast_to_bytes, 1295 attempt_cast_to_geometry, 1296 coerce_timezone as _coerce_timezone, 1297 get_geometry_type_srid, 1298 ) 1299 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 1300 pandas = mrsm.attempt_import('pandas') 1301 is_dask = 'dask' in df.__module__ 1302 if safe_copy: 1303 df = df.copy() 1304 if len(df.columns) == 0: 1305 if debug: 1306 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 1307 return df 1308 1309 explicit_dtypes = explicit_dtypes or {} 1310 pipe_pandas_dtypes = { 1311 col: to_pandas_dtype(typ) 1312 for col, typ in dtypes.items() 1313 } 1314 json_cols = [ 1315 col 1316 for col, typ in dtypes.items() 1317 if typ == 'json' 1318 ] 1319 numeric_cols = [ 1320 col 1321 for col, typ in dtypes.items() 1322 if typ.startswith('numeric') 1323 ] 1324 geometry_cols_types_srids = { 1325 col: get_geometry_type_srid(typ, default_srid=0) 1326 for col, typ in dtypes.items() 1327 if typ.startswith('geometry') or typ.startswith('geography') 1328 } 1329 uuid_cols = [ 1330 col 1331 for col, typ in dtypes.items() 1332 if typ == 'uuid' 1333 ] 1334 bytes_cols = [ 1335 col 1336 for col, typ in dtypes.items() 1337 if typ == 'bytes' 1338 ] 1339 datetime_cols = [ 1340 col 1341 for col, typ in dtypes.items() 1342 if are_dtypes_equal(typ, 'datetime') 1343 ] 1344 df_numeric_cols = get_numeric_cols(df) 1345 if debug: 1346 dprint("Desired data types:") 1347 pprint(dtypes) 1348 dprint("Data types for incoming DataFrame:") 1349 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 1350 1351 if json_cols and len(df) > 0: 1352 if debug: 1353 dprint(f"Checking columns for JSON encoding: {json_cols}") 1354 for col in json_cols: 1355 if col in df.columns: 1356 try: 1357 df[col] = df[col].apply( 1358 ( 1359 lambda x: ( 1360 json.loads(x) 1361 if isinstance(x, str) 1362 else x 1363 ) 1364 ) 1365 ) 1366 except Exception as e: 1367 if debug: 1368 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 1369 1370 if numeric_cols: 1371 if debug: 1372 dprint(f"Checking for numerics: {numeric_cols}") 1373 for col in numeric_cols: 1374 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 1375 if col in df.columns: 1376 try: 1377 df[col] = df[col].apply( 1378 functools.partial( 1379 attempt_cast_to_numeric, 1380 quantize=True, 1381 precision=precision, 1382 scale=scale, 1383 ) 1384 ) 1385 except Exception as e: 1386 if debug: 1387 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 1388 1389 if uuid_cols: 1390 if debug: 1391 dprint(f"Checking for UUIDs: {uuid_cols}") 1392 for col in uuid_cols: 1393 if col in df.columns: 1394 try: 1395 df[col] = df[col].apply(attempt_cast_to_uuid) 1396 except Exception as e: 1397 if debug: 1398 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1399 1400 if bytes_cols: 1401 if debug: 1402 dprint(f"Checking for bytes: {bytes_cols}") 1403 for col in bytes_cols: 1404 if col in df.columns: 1405 try: 1406 df[col] = df[col].apply(attempt_cast_to_bytes) 1407 except Exception as e: 1408 if debug: 1409 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1410 1411 if datetime_cols and coerce_timezone: 1412 if debug: 1413 dprint(f"Checking for datetime conversion: {datetime_cols}") 1414 for col in datetime_cols: 1415 if col in df.columns: 1416 if not strip_timezone and 'utc' in str(df.dtypes[col]).lower(): 1417 if debug: 1418 dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).") 1419 continue 1420 if strip_timezone and ',' not in str(df.dtypes[col]): 1421 if debug: 1422 dprint( 1423 f"Skip UTC coersion (stripped) for column '{col}' " 1424 f"({str(df[col].dtype)})." 1425 ) 1426 continue 1427 1428 if debug: 1429 dprint( 1430 f"Data type for column '{col}' before timezone coersion: " 1431 f"{str(df[col].dtype)}" 1432 ) 1433 1434 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1435 if debug: 1436 dprint( 1437 f"Data type for column '{col}' after timezone coersion: " 1438 f"{str(df[col].dtype)}" 1439 ) 1440 1441 if geometry_cols_types_srids: 1442 geopandas = mrsm.attempt_import('geopandas') 1443 if debug: 1444 dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}") 1445 parsed_geom_cols = [] 1446 for col in geometry_cols_types_srids: 1447 if col not in df.columns: 1448 continue 1449 try: 1450 df[col] = attempt_cast_to_geometry(df[col]) 1451 parsed_geom_cols.append(col) 1452 except Exception as e: 1453 import traceback 1454 traceback.print_exc() 1455 if debug: 1456 dprint(f"Unable to parse column '{col}' as geometry:\n{e}") 1457 1458 if parsed_geom_cols: 1459 if debug: 1460 dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...") 1461 try: 1462 _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]] 1463 df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid) 1464 for col, (_, srid) in geometry_cols_types_srids.items(): 1465 if srid: 1466 if debug: 1467 dprint(f"Setting '{col}' to SRID '{srid}'...") 1468 _ = df[col].set_crs(srid) 1469 if parsed_geom_cols[0] not in df.columns: 1470 df.rename_geometry(parsed_geom_cols[0], inplace=True) 1471 except (ValueError, TypeError): 1472 import traceback 1473 dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}") 1474 1475 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1476 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1477 if debug: 1478 dprint("Data types match. Exiting enforcement...") 1479 return df 1480 1481 common_dtypes = {} 1482 common_diff_dtypes = {} 1483 for col, typ in pipe_pandas_dtypes.items(): 1484 if col in df_dtypes: 1485 common_dtypes[col] = typ 1486 if not are_dtypes_equal(typ, df_dtypes[col]): 1487 common_diff_dtypes[col] = df_dtypes[col] 1488 1489 if debug: 1490 dprint("Common columns with different dtypes:") 1491 pprint(common_diff_dtypes) 1492 1493 detected_dt_cols = {} 1494 for col, typ in common_diff_dtypes.items(): 1495 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1496 df_dtypes[col] = typ 1497 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1498 for col in detected_dt_cols: 1499 del common_diff_dtypes[col] 1500 1501 if debug: 1502 dprint("Common columns with different dtypes (after dates):") 1503 pprint(common_diff_dtypes) 1504 1505 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1506 if debug: 1507 dprint( 1508 "The incoming DataFrame has mostly the same types, skipping enforcement." 1509 + "The only detected difference was in the following datetime columns." 1510 ) 1511 pprint(detected_dt_cols) 1512 return df 1513 1514 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1515 previous_typ = common_dtypes[col] 1516 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1517 explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float') 1518 explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int') 1519 explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric') 1520 all_nan = ( 1521 df[col].isnull().all() 1522 if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int) 1523 else None 1524 ) 1525 cast_to_numeric = explicitly_numeric or ( 1526 ( 1527 col in df_numeric_cols 1528 or ( 1529 mixed_numeric_types 1530 and not (explicitly_float or explicitly_int) 1531 and not all_nan 1532 and coerce_numeric 1533 ) 1534 ) 1535 ) 1536 1537 if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types): 1538 from meerschaum.utils.formatting import make_header 1539 msg = ( 1540 make_header(f"Coercing column '{col}' to numeric:", left_pad=0) 1541 + "\n" 1542 + f" Previous type: {previous_typ}\n" 1543 + f" Current type: {typ if col not in df_numeric_cols else 'Decimal'}" 1544 + ("\n Column is explicitly numeric." if explicitly_numeric else "") 1545 ) if cast_to_numeric else ( 1546 f"Will not coerce column '{col}' to numeric.\n" 1547 f" Numeric columns in dataframe: {df_numeric_cols}\n" 1548 f" Mixed numeric types: {mixed_numeric_types}\n" 1549 f" Explicitly float: {explicitly_float}\n" 1550 f" Explicitly int: {explicitly_int}\n" 1551 f" All NaN: {all_nan}\n" 1552 f" Coerce numeric: {coerce_numeric}" 1553 ) 1554 dprint(msg) 1555 1556 if cast_to_numeric: 1557 common_dtypes[col] = attempt_cast_to_numeric 1558 common_diff_dtypes[col] = attempt_cast_to_numeric 1559 1560 for d in common_diff_dtypes: 1561 t = common_dtypes[d] 1562 if debug: 1563 dprint(f"Casting column {d} to dtype {t}.") 1564 try: 1565 df[d] = ( 1566 df[d].apply(t) 1567 if callable(t) 1568 else df[d].astype(t) 1569 ) 1570 except Exception as e: 1571 if debug: 1572 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}") 1573 if 'int' in str(t).lower(): 1574 try: 1575 df[d] = df[d].astype('float64').astype(t) 1576 except Exception: 1577 if debug: 1578 dprint(f"Was unable to convert to float then {t}.") 1579 return df 1580 1581 1582def get_datetime_bound_from_df( 1583 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1584 datetime_column: str, 1585 minimum: bool = True, 1586) -> Union[int, datetime, None]: 1587 """ 1588 Return the minimum or maximum datetime (or integer) from a DataFrame. 1589 1590 Parameters 1591 ---------- 1592 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1593 The DataFrame, list, or dict which contains the range axis. 1594 1595 datetime_column: str 1596 The name of the datetime (or int) column. 1597 1598 minimum: bool 1599 Whether to return the minimum (default) or maximum value. 1600 1601 Returns 1602 ------- 1603 The minimum or maximum datetime value in the dataframe, or `None`. 1604 """ 1605 from meerschaum.utils.dtypes import to_datetime, value_is_null 1606 1607 if df is None: 1608 return None 1609 if not datetime_column: 1610 return None 1611 1612 def compare(a, b): 1613 if a is None: 1614 return b 1615 if b is None: 1616 return a 1617 if minimum: 1618 return a if a < b else b 1619 return a if a > b else b 1620 1621 if isinstance(df, list): 1622 if len(df) == 0: 1623 return None 1624 best_yet = df[0].get(datetime_column, None) 1625 for doc in df: 1626 val = doc.get(datetime_column, None) 1627 best_yet = compare(best_yet, val) 1628 return best_yet 1629 1630 if isinstance(df, dict): 1631 if datetime_column not in df: 1632 return None 1633 best_yet = df[datetime_column][0] 1634 for val in df[datetime_column]: 1635 best_yet = compare(best_yet, val) 1636 return best_yet 1637 1638 if 'DataFrame' in str(type(df)): 1639 from meerschaum.utils.dtypes import are_dtypes_equal 1640 pandas = mrsm.attempt_import('pandas') 1641 is_dask = 'dask' in df.__module__ 1642 1643 if datetime_column not in df.columns: 1644 return None 1645 1646 try: 1647 dt_val = ( 1648 df[datetime_column].min(skipna=True) 1649 if minimum 1650 else df[datetime_column].max(skipna=True) 1651 ) 1652 except Exception: 1653 dt_val = pandas.NA 1654 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1655 dt_val = dt_val.compute() 1656 1657 return ( 1658 to_datetime(dt_val, as_pydatetime=True) 1659 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1660 else (dt_val if not value_is_null(dt_val) else None) 1661 ) 1662 1663 return None 1664 1665 1666def get_unique_index_values( 1667 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1668 indices: List[str], 1669) -> Dict[str, List[Any]]: 1670 """ 1671 Return a dictionary of the unique index values in a DataFrame. 1672 1673 Parameters 1674 ---------- 1675 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1676 The dataframe (or list or dict) which contains index values. 1677 1678 indices: List[str] 1679 The list of index columns. 1680 1681 Returns 1682 ------- 1683 A dictionary mapping indices to unique values. 1684 """ 1685 if df is None: 1686 return {} 1687 if 'dataframe' in str(type(df)).lower(): 1688 pandas = mrsm.attempt_import('pandas') 1689 return { 1690 col: list({ 1691 (val if val is not pandas.NA else None) 1692 for val in df[col].unique() 1693 }) 1694 for col in indices 1695 if col in df.columns 1696 } 1697 1698 unique_indices = defaultdict(lambda: set()) 1699 if isinstance(df, list): 1700 for doc in df: 1701 for index in indices: 1702 if index in doc: 1703 unique_indices[index].add(doc[index]) 1704 1705 elif isinstance(df, dict): 1706 for index in indices: 1707 if index in df: 1708 unique_indices[index] = unique_indices[index].union(set(df[index])) 1709 1710 return {key: list(val) for key, val in unique_indices.items()} 1711 1712 1713def df_is_chunk_generator(df: Any) -> bool: 1714 """ 1715 Determine whether to treat `df` as a chunk generator. 1716 1717 Note this should only be used in a context where generators are expected, 1718 as it will return `True` for any iterable. 1719 1720 Parameters 1721 ---------- 1722 The DataFrame or chunk generator to evaluate. 1723 1724 Returns 1725 ------- 1726 A `bool` indicating whether to treat `df` as a generator. 1727 """ 1728 return ( 1729 not isinstance(df, (dict, list, str)) 1730 and 'DataFrame' not in str(type(df)) 1731 and isinstance(df, (Generator, Iterable, Iterator)) 1732 ) 1733 1734 1735def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1736 """ 1737 Return the Dask `npartitions` value for a given `chunksize`. 1738 """ 1739 if chunksize == -1: 1740 from meerschaum.config import get_config 1741 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1742 if chunksize is None: 1743 return 1 1744 return -1 * chunksize 1745 1746 1747def df_from_literal( 1748 pipe: Optional[mrsm.Pipe] = None, 1749 literal: Optional[str] = None, 1750 debug: bool = False 1751) -> 'pd.DataFrame': 1752 """ 1753 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1754 1755 Parameters 1756 ---------- 1757 pipe: Optional['meerschaum.Pipe'], default None 1758 The pipe which will consume the literal value. 1759 1760 Returns 1761 ------- 1762 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1763 and the literal as the value. 1764 """ 1765 from meerschaum.utils.packages import import_pandas 1766 from meerschaum.utils.warnings import error, warn 1767 from meerschaum.utils.debug import dprint 1768 from meerschaum.utils.dtypes import get_current_timestamp 1769 1770 if pipe is None or literal is None: 1771 error("Please provide a Pipe and a literal value") 1772 1773 dt_col = pipe.columns.get( 1774 'datetime', 1775 mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing') 1776 ) 1777 val_col = pipe.get_val_column(debug=debug) 1778 1779 val = literal 1780 if isinstance(literal, str): 1781 if debug: 1782 dprint(f"Received literal string: '{literal}'") 1783 import ast 1784 try: 1785 val = ast.literal_eval(literal) 1786 except Exception: 1787 warn( 1788 "Failed to parse value from string:\n" + f"{literal}" + 1789 "\n\nWill cast as a string instead."\ 1790 ) 1791 val = literal 1792 1793 now = get_current_timestamp(pipe.precision) 1794 pd = import_pandas() 1795 return pd.DataFrame({dt_col: [now], val_col: [val]}) 1796 1797 1798def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1799 """ 1800 Return the first valid Dask DataFrame partition (if possible). 1801 """ 1802 pdf = None 1803 for partition in ddf.partitions: 1804 try: 1805 pdf = partition.compute() 1806 except Exception: 1807 continue 1808 if len(pdf) > 0: 1809 return pdf 1810 _ = mrsm.attempt_import('partd', lazy=False) 1811 return ddf.compute() 1812 1813 1814def query_df( 1815 df: 'pd.DataFrame', 1816 params: Optional[Dict[str, Any]] = None, 1817 begin: Union[datetime, int, None] = None, 1818 end: Union[datetime, int, None] = None, 1819 datetime_column: Optional[str] = None, 1820 select_columns: Optional[List[str]] = None, 1821 omit_columns: Optional[List[str]] = None, 1822 inplace: bool = False, 1823 reset_index: bool = False, 1824 coerce_types: bool = False, 1825 debug: bool = False, 1826) -> 'pd.DataFrame': 1827 """ 1828 Query the dataframe with the params dictionary. 1829 1830 Parameters 1831 ---------- 1832 df: pd.DataFrame 1833 The DataFrame to query against. 1834 1835 params: Optional[Dict[str, Any]], default None 1836 The parameters dictionary to use for the query. 1837 1838 begin: Union[datetime, int, None], default None 1839 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1840 greater than or equal to this value. 1841 1842 end: Union[datetime, int, None], default None 1843 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1844 less than this value. 1845 1846 datetime_column: Optional[str], default None 1847 A `datetime_column` must be provided to use `begin` and `end`. 1848 1849 select_columns: Optional[List[str]], default None 1850 If provided, only return these columns. 1851 1852 omit_columns: Optional[List[str]], default None 1853 If provided, do not include these columns in the result. 1854 1855 inplace: bool, default False 1856 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1857 1858 reset_index: bool, default False 1859 If `True`, reset the index in the resulting DataFrame. 1860 1861 coerce_types: bool, default False 1862 If `True`, cast the dataframe and parameters as strings before querying. 1863 1864 Returns 1865 ------- 1866 A Pandas DataFrame query result. 1867 """ 1868 1869 def _process_select_columns(_df): 1870 if not select_columns: 1871 return 1872 for col in list(_df.columns): 1873 if col not in select_columns: 1874 del _df[col] 1875 1876 def _process_omit_columns(_df): 1877 if not omit_columns: 1878 return 1879 for col in list(_df.columns): 1880 if col in omit_columns: 1881 del _df[col] 1882 1883 if not params and not begin and not end: 1884 if not inplace: 1885 df = df.copy() 1886 _process_select_columns(df) 1887 _process_omit_columns(df) 1888 return df 1889 1890 from meerschaum.utils.debug import dprint 1891 from meerschaum.utils.misc import get_in_ex_params 1892 from meerschaum.utils.warnings import warn 1893 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1894 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1895 pandas = mrsm.attempt_import('pandas') 1896 NA = pandas.NA 1897 1898 if params: 1899 proto_in_ex_params = get_in_ex_params(params) 1900 for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items(): 1901 if proto_ex_vals: 1902 coerce_types = True 1903 break 1904 params = params.copy() 1905 for key, val in {k: v for k, v in params.items()}.items(): 1906 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'): 1907 if None in val: 1908 val = [item for item in val if item is not None] + [NA] 1909 params[key] = val 1910 if coerce_types: 1911 params[key] = [str(x) for x in val] 1912 else: 1913 if value_is_null(val): 1914 val = NA 1915 params[key] = NA 1916 if coerce_types: 1917 params[key] = str(val) 1918 1919 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1920 1921 if inplace: 1922 df.fillna(NA, inplace=True) 1923 else: 1924 df = df.infer_objects().fillna(NA) 1925 1926 if isinstance(begin, str): 1927 begin = dateutil_parser.parse(begin) 1928 if isinstance(end, str): 1929 end = dateutil_parser.parse(end) 1930 1931 if begin is not None or end is not None: 1932 if not datetime_column or datetime_column not in df.columns: 1933 warn( 1934 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1935 + "ignoring begin and end...", 1936 ) 1937 begin, end = None, None 1938 1939 if debug: 1940 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1941 1942 if datetime_column and (begin is not None or end is not None): 1943 if debug: 1944 dprint("Checking for datetime column compatability.") 1945 1946 from meerschaum.utils.dtypes import coerce_timezone 1947 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1948 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1949 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1950 1951 if df_is_dt: 1952 df_tz = ( 1953 getattr(df[datetime_column].dt, 'tz', None) 1954 if hasattr(df[datetime_column], 'dt') 1955 else None 1956 ) 1957 1958 if begin_is_int: 1959 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1960 if debug: 1961 dprint(f"`begin` will be cast to '{begin}'.") 1962 if end_is_int: 1963 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1964 if debug: 1965 dprint(f"`end` will be cast to '{end}'.") 1966 1967 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1968 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1969 1970 in_ex_params = get_in_ex_params(params) 1971 1972 masks = [ 1973 ( 1974 (df[datetime_column] >= begin) 1975 if begin is not None and datetime_column 1976 else True 1977 ) & ( 1978 (df[datetime_column] < end) 1979 if end is not None and datetime_column 1980 else True 1981 ) 1982 ] 1983 1984 masks.extend([ 1985 ( 1986 ( 1987 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1988 if in_vals 1989 else True 1990 ) & ( 1991 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1992 if ex_vals 1993 else True 1994 ) 1995 ) 1996 for col, (in_vals, ex_vals) in in_ex_params.items() 1997 if col in df.columns 1998 ]) 1999 query_mask = masks[0] 2000 for mask in masks[1:]: 2001 query_mask = query_mask & mask 2002 2003 original_cols = df.columns 2004 2005 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 2006 ### to allow for `<NA>` values. 2007 bool_cols = [ 2008 col 2009 for col, typ in df.dtypes.items() 2010 if are_dtypes_equal(str(typ), 'bool') 2011 ] 2012 for col in bool_cols: 2013 df[col] = df[col].astype('boolean[pyarrow]') 2014 2015 if not isinstance(query_mask, bool): 2016 df['__mrsm_mask'] = ( 2017 query_mask.astype('boolean[pyarrow]') 2018 if hasattr(query_mask, 'astype') 2019 else query_mask 2020 ) 2021 2022 if inplace: 2023 df.where(query_mask, other=NA, inplace=True) 2024 df.dropna(how='all', inplace=True) 2025 result_df = df 2026 else: 2027 result_df = df.where(query_mask, other=NA) 2028 result_df.dropna(how='all', inplace=True) 2029 2030 else: 2031 result_df = df 2032 2033 if '__mrsm_mask' in df.columns: 2034 del df['__mrsm_mask'] 2035 if '__mrsm_mask' in result_df.columns: 2036 del result_df['__mrsm_mask'] 2037 2038 if reset_index: 2039 result_df.reset_index(drop=True, inplace=True) 2040 2041 result_df = enforce_dtypes( 2042 result_df, 2043 dtypes, 2044 safe_copy=False, 2045 debug=debug, 2046 coerce_numeric=False, 2047 coerce_timezone=False, 2048 ) 2049 2050 if select_columns == ['*']: 2051 select_columns = None 2052 2053 if not select_columns and not omit_columns: 2054 return result_df[original_cols] 2055 2056 _process_select_columns(result_df) 2057 _process_omit_columns(result_df) 2058 2059 return result_df 2060 2061 2062def to_json( 2063 df: 'pd.DataFrame', 2064 safe_copy: bool = True, 2065 orient: str = 'records', 2066 date_format: str = 'iso', 2067 date_unit: str = 'us', 2068 double_precision: int = 15, 2069 geometry_format: str = 'geojson', 2070 **kwargs: Any 2071) -> str: 2072 """ 2073 Serialize the given dataframe as a JSON string. 2074 2075 Parameters 2076 ---------- 2077 df: pd.DataFrame 2078 The DataFrame to be serialized. 2079 2080 safe_copy: bool, default True 2081 If `False`, modify the DataFrame inplace. 2082 2083 date_format: str, default 'iso' 2084 The default format for timestamps. 2085 2086 date_unit: str, default 'us' 2087 The precision of the timestamps. 2088 2089 double_precision: int, default 15 2090 The number of decimal places to use when encoding floating point values (maximum 15). 2091 2092 geometry_format: str, default 'geojson' 2093 The serialization format for geometry data. 2094 Accepted values are `geojson`, `wkb_hex`, and `wkt`. 2095 2096 Returns 2097 ------- 2098 A JSON string. 2099 """ 2100 import warnings 2101 import functools 2102 from meerschaum.utils.packages import import_pandas 2103 from meerschaum.utils.dtypes import ( 2104 serialize_bytes, 2105 serialize_decimal, 2106 serialize_geometry, 2107 ) 2108 pd = import_pandas() 2109 uuid_cols = get_uuid_cols(df) 2110 bytes_cols = get_bytes_cols(df) 2111 numeric_cols = get_numeric_cols(df) 2112 geometry_cols = get_geometry_cols(df) 2113 geometry_cols_srids = { 2114 col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0') 2115 for col in geometry_cols 2116 } if 'geodataframe' in str(type(df)).lower() else {} 2117 if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols): 2118 df = df.copy() 2119 if 'geodataframe' in str(type(df)).lower(): 2120 geometry_data = { 2121 col: df[col] 2122 for col in geometry_cols 2123 } 2124 df = pd.DataFrame({ 2125 col: df[col] 2126 for col in df.columns 2127 if col not in geometry_cols 2128 }) 2129 for col in geometry_cols: 2130 df[col] = pd.Series(ob for ob in geometry_data[col]) 2131 for col in uuid_cols: 2132 df[col] = df[col].astype(str) 2133 for col in bytes_cols: 2134 df[col] = df[col].apply(serialize_bytes) 2135 for col in numeric_cols: 2136 df[col] = df[col].apply(serialize_decimal) 2137 with warnings.catch_warnings(): 2138 warnings.simplefilter("ignore") 2139 for col in geometry_cols: 2140 srid = geometry_cols_srids.get(col, None) or None 2141 df[col] = pd.Series( 2142 serialize_geometry(val, geometry_format=geometry_format, srid=srid) 2143 for val in df[col] 2144 ) 2145 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 2146 date_format=date_format, 2147 date_unit=date_unit, 2148 double_precision=double_precision, 2149 orient=orient, 2150 **kwargs 2151 ) 2152 2153 2154def to_simple_lines(df: 'pd.DataFrame') -> str: 2155 """ 2156 Serialize a Pandas Dataframe as lines of simple dictionaries. 2157 2158 Parameters 2159 ---------- 2160 df: pd.DataFrame 2161 The dataframe to serialize into simple lines text. 2162 2163 Returns 2164 ------- 2165 A string of simple line dictionaries joined by newlines. 2166 """ 2167 from meerschaum.utils.misc import to_simple_dict 2168 if df is None or len(df) == 0: 2169 return '' 2170 2171 docs = df.to_dict(orient='records') 2172 return '\n'.join(to_simple_dict(doc) for doc in docs) 2173 2174 2175def parse_simple_lines(data: str) -> 'pd.DataFrame': 2176 """ 2177 Parse simple lines text into a DataFrame. 2178 2179 Parameters 2180 ---------- 2181 data: str 2182 The simple lines text to parse into a DataFrame. 2183 2184 Returns 2185 ------- 2186 A dataframe containing the rows serialized in `data`. 2187 """ 2188 from meerschaum.utils.misc import string_to_dict 2189 from meerschaum.utils.packages import import_pandas 2190 pd = import_pandas() 2191 lines = data.splitlines() 2192 try: 2193 docs = [string_to_dict(line) for line in lines] 2194 df = pd.DataFrame(docs) 2195 except Exception: 2196 df = None 2197 2198 if df is None: 2199 raise ValueError("Cannot parse simple lines into a dataframe.") 2200 2201 return df
25def add_missing_cols_to_df( 26 df: 'pd.DataFrame', 27 dtypes: Dict[str, Any], 28) -> 'pd.DataFrame': 29 """ 30 Add columns from the dtypes dictionary as null columns to a new DataFrame. 31 32 Parameters 33 ---------- 34 df: pd.DataFrame 35 The dataframe we should copy and add null columns. 36 37 dtypes: 38 The data types dictionary which may contain keys not present in `df.columns`. 39 40 Returns 41 ------- 42 A new `DataFrame` with the keys from `dtypes` added as null columns. 43 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 44 NOTE: This will not ensure that dtypes are enforced! 45 46 Examples 47 -------- 48 >>> import pandas as pd 49 >>> df = pd.DataFrame([{'a': 1}]) 50 >>> dtypes = {'b': 'Int64'} 51 >>> add_missing_cols_to_df(df, dtypes) 52 a b 53 0 1 <NA> 54 >>> add_missing_cols_to_df(df, dtypes).dtypes 55 a int64 56 b Int64 57 dtype: object 58 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 59 a int64 60 dtype: object 61 >>> 62 """ 63 if set(df.columns) == set(dtypes): 64 return df 65 66 from meerschaum.utils.packages import attempt_import 67 from meerschaum.utils.dtypes import to_pandas_dtype 68 pandas = attempt_import('pandas') 69 70 def build_series(dtype: str): 71 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 72 73 assign_kwargs = { 74 str(col): build_series(str(typ)) 75 for col, typ in dtypes.items() 76 if col not in df.columns 77 } 78 df_with_cols = df.assign(**assign_kwargs) 79 for col in assign_kwargs: 80 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 81 return df_with_cols
Add columns from the dtypes dictionary as null columns to a new DataFrame.
Parameters
- df (pd.DataFrame): The dataframe we should copy and add null columns.
- dtypes:: The data types dictionary which may contain keys not present in
df.columns.
Returns
- A new
DataFramewith the keys fromdtypesadded as null columns. - If
df.dtypesis the same asdtypes, then return a reference todf. - NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
a b
0 1 <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a int64
b Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a int64
dtype: object
>>>
84def filter_unseen_df( 85 old_df: 'pd.DataFrame', 86 new_df: 'pd.DataFrame', 87 safe_copy: bool = True, 88 dtypes: Optional[Dict[str, Any]] = None, 89 include_unchanged_columns: bool = False, 90 coerce_mixed_numerics: bool = True, 91 debug: bool = False, 92) -> 'pd.DataFrame': 93 """ 94 Left join two DataFrames to find the newest unseen data. 95 96 Parameters 97 ---------- 98 old_df: 'pd.DataFrame' 99 The original (target) dataframe. Acts as a filter on the `new_df`. 100 101 new_df: 'pd.DataFrame' 102 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 103 104 safe_copy: bool, default True 105 If `True`, create a copy before comparing and modifying the dataframes. 106 Setting to `False` may mutate the DataFrames. 107 108 dtypes: Optional[Dict[str, Any]], default None 109 Optionally specify the datatypes of the dataframe. 110 111 include_unchanged_columns: bool, default False 112 If `True`, include columns which haven't changed on rows which have changed. 113 114 coerce_mixed_numerics: bool, default True 115 If `True`, cast mixed integer and float columns between the old and new dataframes into 116 numeric values (`decimal.Decimal`). 117 118 debug: bool, default False 119 Verbosity toggle. 120 121 Returns 122 ------- 123 A pandas dataframe of the new, unseen rows in `new_df`. 124 125 Examples 126 -------- 127 ```python 128 >>> import pandas as pd 129 >>> df1 = pd.DataFrame({'a': [1,2]}) 130 >>> df2 = pd.DataFrame({'a': [2,3]}) 131 >>> filter_unseen_df(df1, df2) 132 a 133 0 3 134 135 ``` 136 137 """ 138 if old_df is None: 139 return new_df 140 141 if safe_copy: 142 old_df = old_df.copy() 143 new_df = new_df.copy() 144 145 import json 146 import functools 147 import traceback 148 from meerschaum.utils.warnings import warn 149 from meerschaum.utils.packages import import_pandas, attempt_import 150 from meerschaum.utils.dtypes import ( 151 to_pandas_dtype, 152 are_dtypes_equal, 153 attempt_cast_to_numeric, 154 attempt_cast_to_uuid, 155 attempt_cast_to_bytes, 156 attempt_cast_to_geometry, 157 coerce_timezone, 158 serialize_decimal, 159 ) 160 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 161 pd = import_pandas(debug=debug) 162 is_dask = 'dask' in new_df.__module__ 163 if is_dask: 164 pandas = attempt_import('pandas') 165 _ = attempt_import('partd', lazy=False) 166 dd = attempt_import('dask.dataframe') 167 merge = dd.merge 168 NA = pandas.NA 169 else: 170 merge = pd.merge 171 NA = pd.NA 172 173 new_df_dtypes = dict(new_df.dtypes) 174 old_df_dtypes = dict(old_df.dtypes) 175 176 same_cols = set(new_df.columns) == set(old_df.columns) 177 if not same_cols: 178 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 179 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 180 181 new_types_missing_from_old = { 182 col: typ 183 for col, typ in new_df_dtypes.items() 184 if col not in old_df_dtypes 185 } 186 old_types_missing_from_new = { 187 col: typ 188 for col, typ in new_df_dtypes.items() 189 if col not in old_df_dtypes 190 } 191 old_df_dtypes.update(new_types_missing_from_old) 192 new_df_dtypes.update(old_types_missing_from_new) 193 194 ### Edge case: two empty lists cast to DFs. 195 elif len(new_df.columns) == 0: 196 return new_df 197 198 try: 199 ### Order matters when checking equality. 200 new_df = new_df[old_df.columns] 201 202 except Exception as e: 203 warn( 204 "Was not able to cast old columns onto new DataFrame. " + 205 f"Are both DataFrames the same shape? Error:\n{e}", 206 stacklevel=3, 207 ) 208 return new_df[list(new_df_dtypes.keys())] 209 210 ### assume the old_df knows what it's doing, even if it's technically wrong. 211 if dtypes is None: 212 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 213 214 dtypes = { 215 col: to_pandas_dtype(typ) 216 for col, typ in dtypes.items() 217 if col in new_df_dtypes and col in old_df_dtypes 218 } 219 for col, typ in new_df_dtypes.items(): 220 if col not in dtypes: 221 dtypes[col] = typ 222 223 numeric_cols_precisions_scales = { 224 col: get_numeric_precision_scale(None, typ) 225 for col, typ in dtypes.items() 226 if col and str(typ).lower().startswith('numeric') 227 } 228 229 dt_dtypes = { 230 col: typ 231 for col, typ in dtypes.items() 232 if are_dtypes_equal(typ, 'datetime') 233 } 234 non_dt_dtypes = { 235 col: typ 236 for col, typ in dtypes.items() 237 if col not in dt_dtypes 238 } 239 240 cast_non_dt_cols = True 241 try: 242 new_df = new_df.astype(non_dt_dtypes) 243 cast_non_dt_cols = False 244 except Exception as e: 245 warn( 246 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 247 ) 248 249 cast_dt_cols = True 250 try: 251 for col, typ in dt_dtypes.items(): 252 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 253 strip_utc = ( 254 _dtypes_col_dtype.startswith('datetime64') 255 and 'utc' not in _dtypes_col_dtype.lower() 256 ) 257 if col in old_df.columns: 258 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 259 if col in new_df.columns: 260 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 261 cast_dt_cols = False 262 except Exception as e: 263 warn(f"Could not cast datetime columns:\n{e}") 264 265 cast_cols = cast_dt_cols or cast_non_dt_cols 266 267 new_numeric_cols_existing = get_numeric_cols(new_df) 268 old_numeric_cols = get_numeric_cols(old_df) 269 for col, typ in {k: v for k, v in dtypes.items()}.items(): 270 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 271 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 272 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 273 new_is_numeric = col in new_numeric_cols_existing 274 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 275 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 276 old_is_numeric = col in old_numeric_cols 277 278 if ( 279 coerce_mixed_numerics 280 and 281 (new_is_float or new_is_int or new_is_numeric) 282 and 283 (old_is_float or old_is_int or old_is_numeric) 284 ): 285 dtypes[col] = attempt_cast_to_numeric 286 cast_cols = True 287 continue 288 289 ### Fallback to object if the types don't match. 290 warn( 291 f"Detected different types for '{col}' " 292 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 293 + "falling back to 'object'..." 294 ) 295 dtypes[col] = 'object' 296 cast_cols = True 297 298 if cast_cols: 299 for col, dtype in dtypes.items(): 300 if col in new_df.columns: 301 try: 302 new_df[col] = ( 303 new_df[col].astype(dtype) 304 if not callable(dtype) 305 else new_df[col].apply(dtype) 306 ) 307 except Exception as e: 308 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 309 310 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 311 new_json_cols = get_json_cols(new_df) 312 old_json_cols = get_json_cols(old_df) 313 json_cols = set(new_json_cols + old_json_cols) 314 for json_col in old_json_cols: 315 old_df[json_col] = old_df[json_col].apply(serializer) 316 for json_col in new_json_cols: 317 new_df[json_col] = new_df[json_col].apply(serializer) 318 319 new_numeric_cols = get_numeric_cols(new_df) 320 numeric_cols = set(new_numeric_cols + old_numeric_cols) 321 for numeric_col in old_numeric_cols: 322 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 323 for numeric_col in new_numeric_cols: 324 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 325 326 old_dt_cols = [ 327 col 328 for col, typ in old_df.dtypes.items() 329 if are_dtypes_equal(str(typ), 'datetime') 330 ] 331 for col in old_dt_cols: 332 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 333 strip_utc = ( 334 _dtypes_col_dtype.startswith('datetime64') 335 and 'utc' not in _dtypes_col_dtype.lower() 336 ) 337 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 338 339 new_dt_cols = [ 340 col 341 for col, typ in new_df.dtypes.items() 342 if are_dtypes_equal(str(typ), 'datetime') 343 ] 344 for col in new_dt_cols: 345 _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime')) 346 strip_utc = ( 347 _dtypes_col_dtype.startswith('datetime64') 348 and 'utc' not in _dtypes_col_dtype.lower() 349 ) 350 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 351 352 old_uuid_cols = get_uuid_cols(old_df) 353 new_uuid_cols = get_uuid_cols(new_df) 354 uuid_cols = set(new_uuid_cols + old_uuid_cols) 355 356 old_bytes_cols = get_bytes_cols(old_df) 357 new_bytes_cols = get_bytes_cols(new_df) 358 bytes_cols = set(new_bytes_cols + old_bytes_cols) 359 360 old_geometry_cols = get_geometry_cols(old_df) 361 new_geometry_cols = get_geometry_cols(new_df) 362 geometry_cols = set(new_geometry_cols + old_geometry_cols) 363 364 na_pattern = r'(?i)^(none|nan|na|nat|natz|<na>)$' 365 joined_df = merge( 366 new_df.infer_objects(copy=False).replace(na_pattern, pd.NA, regex=True).fillna(NA), 367 old_df.infer_objects(copy=False).replace(na_pattern, pd.NA, regex=True).fillna(NA), 368 how='left', 369 on=None, 370 indicator=True, 371 ) 372 changed_rows_mask = (joined_df['_merge'] == 'left_only') 373 new_cols = list(new_df_dtypes) 374 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 375 376 delta_json_cols = get_json_cols(delta_df) 377 for json_col in json_cols: 378 if ( 379 json_col in delta_json_cols 380 or json_col not in delta_df.columns 381 ): 382 continue 383 try: 384 delta_df[json_col] = delta_df[json_col].apply( 385 lambda x: (json.loads(x) if isinstance(x, str) else x) 386 ) 387 except Exception: 388 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 389 390 delta_numeric_cols = get_numeric_cols(delta_df) 391 for numeric_col in numeric_cols: 392 if ( 393 numeric_col in delta_numeric_cols 394 or numeric_col not in delta_df.columns 395 ): 396 continue 397 try: 398 delta_df[numeric_col] = delta_df[numeric_col].apply( 399 functools.partial( 400 attempt_cast_to_numeric, 401 quantize=True, 402 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 403 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 404 ) 405 ) 406 except Exception: 407 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 408 409 delta_uuid_cols = get_uuid_cols(delta_df) 410 for uuid_col in uuid_cols: 411 if ( 412 uuid_col in delta_uuid_cols 413 or uuid_col not in delta_df.columns 414 ): 415 continue 416 try: 417 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 418 except Exception: 419 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 420 421 delta_bytes_cols = get_bytes_cols(delta_df) 422 for bytes_col in bytes_cols: 423 if ( 424 bytes_col in delta_bytes_cols 425 or bytes_col not in delta_df.columns 426 ): 427 continue 428 try: 429 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 430 except Exception: 431 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 432 433 delta_geometry_cols = get_geometry_cols(delta_df) 434 for geometry_col in geometry_cols: 435 if ( 436 geometry_col in delta_geometry_cols 437 or geometry_col not in delta_df.columns 438 ): 439 continue 440 try: 441 delta_df[geometry_col] = attempt_cast_to_geometry(delta_df[geometry_col]) 442 except Exception: 443 warn(f"Unable to parse geometry column '{geometry_col}':\n{traceback.format_exc()}") 444 445 return delta_df
Left join two DataFrames to find the newest unseen data.
Parameters
- old_df ('pd.DataFrame'):
The original (target) dataframe. Acts as a filter on the
new_df. - new_df ('pd.DataFrame'):
The fetched (source) dataframe. Rows that are contained in
old_dfare removed. - safe_copy (bool, default True):
If
True, create a copy before comparing and modifying the dataframes. Setting toFalsemay mutate the DataFrames. - dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
- include_unchanged_columns (bool, default False):
If
True, include columns which haven't changed on rows which have changed. - coerce_mixed_numerics (bool, default True):
If
True, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal). - debug (bool, default False): Verbosity toggle.
Returns
- A pandas dataframe of the new, unseen rows in
new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
a
0 3
448def parse_df_datetimes( 449 df: 'pd.DataFrame', 450 ignore_cols: Optional[Iterable[str]] = None, 451 strip_timezone: bool = False, 452 chunksize: Optional[int] = None, 453 dtype_backend: str = 'numpy_nullable', 454 ignore_all: bool = False, 455 precision_unit: Optional[str] = None, 456 coerce_utc: bool = True, 457 debug: bool = False, 458) -> 'pd.DataFrame': 459 """ 460 Parse a pandas DataFrame for datetime columns and cast as datetimes. 461 462 Parameters 463 ---------- 464 df: pd.DataFrame 465 The pandas DataFrame to parse. 466 467 ignore_cols: Optional[Iterable[str]], default None 468 If provided, do not attempt to coerce these columns as datetimes. 469 470 strip_timezone: bool, default False 471 If `True`, remove the UTC `tzinfo` property. 472 473 chunksize: Optional[int], default None 474 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 475 476 dtype_backend: str, default 'numpy_nullable' 477 If `df` is not a DataFrame and new one needs to be constructed, 478 use this as the datatypes backend. 479 Accepted values are 'numpy_nullable' and 'pyarrow'. 480 481 ignore_all: bool, default False 482 If `True`, do not attempt to cast any columns to datetimes. 483 484 precision_unit: Optional[str], default None 485 If provided, enforce the given precision on the coerced datetime columns. 486 487 coerce_utc: bool, default True 488 Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`). 489 490 debug: bool, default False 491 Verbosity toggle. 492 493 Returns 494 ------- 495 A new pandas DataFrame with the determined datetime columns 496 (usually ISO strings) cast as datetimes. 497 498 Examples 499 -------- 500 ```python 501 >>> import pandas as pd 502 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 503 >>> df.dtypes 504 a object 505 dtype: object 506 >>> df2 = parse_df_datetimes(df) 507 >>> df2.dtypes 508 a datetime64[us, UTC] 509 dtype: object 510 511 ``` 512 513 """ 514 from meerschaum.utils.packages import import_pandas, attempt_import 515 from meerschaum.utils.debug import dprint 516 from meerschaum.utils.warnings import warn 517 from meerschaum.utils.misc import items_str 518 from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES 519 import traceback 520 521 pd = import_pandas() 522 pandas = attempt_import('pandas') 523 pd_name = pd.__name__ 524 using_dask = 'dask' in pd_name 525 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 526 dask_dataframe = None 527 if using_dask or df_is_dask: 528 npartitions = chunksize_to_npartitions(chunksize) 529 dask_dataframe = attempt_import('dask.dataframe') 530 531 ### if df is a dict, build DataFrame 532 if isinstance(df, pandas.DataFrame): 533 pdf = df 534 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 535 pdf = get_first_valid_dask_partition(df) 536 else: 537 if debug: 538 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 539 540 if using_dask: 541 if isinstance(df, list): 542 keys = set() 543 for doc in df: 544 for key in doc: 545 keys.add(key) 546 df = pd.DataFrame.from_dict( 547 { 548 k: [ 549 doc.get(k, None) 550 for doc in df 551 ] for k in keys 552 }, 553 npartitions=npartitions, 554 ) 555 elif isinstance(df, dict): 556 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 557 elif 'pandas.core.frame.DataFrame' in str(type(df)): 558 df = pd.from_pandas(df, npartitions=npartitions) 559 else: 560 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 561 pandas = attempt_import('pandas') 562 pdf = get_first_valid_dask_partition(df) 563 564 else: 565 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 566 pdf = df 567 568 ### skip parsing if DataFrame is empty 569 if len(pdf) == 0: 570 if debug: 571 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 572 return df 573 574 ignore_cols = set( 575 (ignore_cols or []) + [ 576 col 577 for col, dtype in pdf.dtypes.items() 578 if 'datetime' in str(dtype) 579 ] 580 ) 581 cols_to_inspect = [ 582 col 583 for col in pdf.columns 584 if col not in ignore_cols 585 ] if not ignore_all else [] 586 587 if len(cols_to_inspect) == 0: 588 if debug: 589 dprint("All columns are ignored, skipping datetime detection...") 590 return df.infer_objects(copy=False).fillna(pandas.NA) 591 592 ### apply regex to columns to determine which are ISO datetimes 593 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 594 dt_mask = pdf[cols_to_inspect].astype(str).apply( 595 lambda s: s.str.match(iso_dt_regex).all() 596 ) 597 598 ### list of datetime column names 599 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 600 if not datetime_cols: 601 if debug: 602 dprint("No columns detected as datetimes, returning...") 603 return df.infer_objects(copy=False).fillna(pandas.NA) 604 605 if debug: 606 dprint("Converting columns to datetimes: " + str(datetime_cols)) 607 608 def _parse_to_datetime(x): 609 return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc) 610 611 try: 612 if not using_dask: 613 df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime) 614 else: 615 df[datetime_cols] = df[datetime_cols].apply( 616 _parse_to_datetime, 617 utc=True, 618 axis=1, 619 meta={ 620 col: MRSM_PD_DTYPES['datetime'] 621 for col in datetime_cols 622 } 623 ) 624 except Exception: 625 warn( 626 f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n" 627 + f"{traceback.format_exc()}" 628 ) 629 630 if strip_timezone: 631 for dt in datetime_cols: 632 try: 633 df[dt] = df[dt].dt.tz_localize(None) 634 except Exception: 635 warn( 636 f"Unable to convert column '{dt}' to naive datetime:\n" 637 + f"{traceback.format_exc()}" 638 ) 639 640 return df.fillna(pandas.NA)
Parse a pandas DataFrame for datetime columns and cast as datetimes.
Parameters
- df (pd.DataFrame): The pandas DataFrame to parse.
- ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
- strip_timezone (bool, default False):
If
True, remove the UTCtzinfoproperty. - chunksize (Optional[int], default None):
If the pandas implementation is
'dask', use this chunksize for the distributed dataframe. - dtype_backend (str, default 'numpy_nullable'):
If
dfis not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'. - ignore_all (bool, default False):
If
True, do not attempt to cast any columns to datetimes. - precision_unit (Optional[str], default None): If provided, enforce the given precision on the coerced datetime columns.
- coerce_utc (bool, default True):
Coerce the datetime columns to UTC (see
meerschaum.utils.dtypes.to_datetime()). - debug (bool, default False): Verbosity toggle.
Returns
- A new pandas DataFrame with the determined datetime columns
- (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']})
>>> df.dtypes
a object
dtype: object
>>> df2 = parse_df_datetimes(df)
>>> df2.dtypes
a datetime64[us, UTC]
dtype: object
643def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 644 """ 645 Get the columns which contain unhashable objects from a Pandas DataFrame. 646 647 Parameters 648 ---------- 649 df: pd.DataFrame 650 The DataFrame which may contain unhashable objects. 651 652 Returns 653 ------- 654 A list of columns. 655 """ 656 if df is None: 657 return [] 658 if len(df) == 0: 659 return [] 660 661 is_dask = 'dask' in df.__module__ 662 if is_dask: 663 from meerschaum.utils.packages import attempt_import 664 pandas = attempt_import('pandas') 665 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 666 return [ 667 col for col, val in df.iloc[0].items() 668 if not isinstance(val, Hashable) 669 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns.
672def get_json_cols(df: 'pd.DataFrame') -> List[str]: 673 """ 674 Get the columns which contain unhashable objects from a Pandas DataFrame. 675 676 Parameters 677 ---------- 678 df: pd.DataFrame 679 The DataFrame which may contain unhashable objects. 680 681 Returns 682 ------- 683 A list of columns to be encoded as JSON. 684 """ 685 if df is None: 686 return [] 687 688 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 689 if is_dask: 690 df = get_first_valid_dask_partition(df) 691 692 if len(df) == 0: 693 return [] 694 695 cols_indices = { 696 col: df[col].first_valid_index() 697 for col in df.columns 698 } 699 return [ 700 col 701 for col, ix in cols_indices.items() 702 if ( 703 ix is not None 704 and isinstance(df.loc[ix][col], (dict, list)) 705 ) 706 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns to be encoded as JSON.
709def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 710 """ 711 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 712 713 Parameters 714 ---------- 715 df: pd.DataFrame 716 The DataFrame which may contain decimal objects. 717 718 Returns 719 ------- 720 A list of columns to treat as numerics. 721 """ 722 if df is None: 723 return [] 724 from decimal import Decimal 725 is_dask = 'dask' in df.__module__ 726 if is_dask: 727 df = get_first_valid_dask_partition(df) 728 729 if len(df) == 0: 730 return [] 731 732 cols_indices = { 733 col: df[col].first_valid_index() 734 for col in df.columns 735 } 736 return [ 737 col 738 for col, ix in cols_indices.items() 739 if ( 740 ix is not None 741 and 742 isinstance(df.loc[ix][col], Decimal) 743 ) 744 ]
Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
- A list of columns to treat as numerics.
747def get_bool_cols(df: 'pd.DataFrame') -> List[str]: 748 """ 749 Get the columns which contain `bool` objects from a Pandas DataFrame. 750 751 Parameters 752 ---------- 753 df: pd.DataFrame 754 The DataFrame which may contain bools. 755 756 Returns 757 ------- 758 A list of columns to treat as bools. 759 """ 760 if df is None: 761 return [] 762 763 is_dask = 'dask' in df.__module__ 764 if is_dask: 765 df = get_first_valid_dask_partition(df) 766 767 if len(df) == 0: 768 return [] 769 770 from meerschaum.utils.dtypes import are_dtypes_equal 771 772 return [ 773 col 774 for col, typ in df.dtypes.items() 775 if are_dtypes_equal(str(typ), 'bool') 776 ]
Get the columns which contain bool objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bools.
Returns
- A list of columns to treat as bools.
779def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 780 """ 781 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 782 783 Parameters 784 ---------- 785 df: pd.DataFrame 786 The DataFrame which may contain UUID objects. 787 788 Returns 789 ------- 790 A list of columns to treat as UUIDs. 791 """ 792 if df is None: 793 return [] 794 from uuid import UUID 795 is_dask = 'dask' in df.__module__ 796 if is_dask: 797 df = get_first_valid_dask_partition(df) 798 799 if len(df) == 0: 800 return [] 801 802 cols_indices = { 803 col: df[col].first_valid_index() 804 for col in df.columns 805 } 806 return [ 807 col 808 for col, ix in cols_indices.items() 809 if ( 810 ix is not None 811 and 812 isinstance(df.loc[ix][col], UUID) 813 ) 814 ]
Get the columns which contain uuid.UUID objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
- A list of columns to treat as UUIDs.
817def get_datetime_cols( 818 df: 'pd.DataFrame', 819 timezone_aware: bool = True, 820 timezone_naive: bool = True, 821 with_tz_precision: bool = False, 822) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]: 823 """ 824 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 825 826 Parameters 827 ---------- 828 df: pd.DataFrame 829 The DataFrame which may contain `datetime` or `Timestamp` objects. 830 831 timezone_aware: bool, default True 832 If `True`, include timezone-aware datetime columns. 833 834 timezone_naive: bool, default True 835 If `True`, include timezone-naive datetime columns. 836 837 with_tz_precision: bool, default False 838 If `True`, return a dictionary mapping column names to tuples in the form 839 `(timezone, precision)`. 840 841 Returns 842 ------- 843 A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples 844 (if `with_tz_precision` is `True`). 845 """ 846 if not timezone_aware and not timezone_naive: 847 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 848 849 if df is None: 850 return [] if not with_tz_precision else {} 851 852 from datetime import datetime 853 from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES 854 is_dask = 'dask' in df.__module__ 855 if is_dask: 856 df = get_first_valid_dask_partition(df) 857 858 def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]: 859 """ 860 Extract the tz + precision tuple from a dtype string. 861 """ 862 meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '') 863 tz = ( 864 None 865 if ',' not in meta_str 866 else meta_str.split(',', maxsplit=1)[-1] 867 ) 868 precision_abbreviation = ( 869 meta_str 870 if ',' not in meta_str 871 else meta_str.split(',')[0] 872 ) 873 precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation] 874 return tz, precision 875 876 def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]: 877 """ 878 Return the tz + precision tuple from a Python datetime object. 879 """ 880 return dt.tzname(), 'microsecond' 881 882 known_dt_cols_types = { 883 col: str(typ) 884 for col, typ in df.dtypes.items() 885 if are_dtypes_equal('datetime', str(typ)) 886 } 887 888 known_dt_cols_tuples = { 889 col: get_tz_precision_from_dtype(typ) 890 for col, typ in known_dt_cols_types.items() 891 } 892 893 if len(df) == 0: 894 return ( 895 list(known_dt_cols_types) 896 if not with_tz_precision 897 else known_dt_cols_tuples 898 ) 899 900 cols_indices = { 901 col: df[col].first_valid_index() 902 for col in df.columns 903 if col not in known_dt_cols_types 904 } 905 pydt_cols_tuples = { 906 col: get_tz_precision_from_datetime(sample_val) 907 for col, ix in cols_indices.items() 908 if ( 909 ix is not None 910 and 911 isinstance((sample_val := df.loc[ix][col]), datetime) 912 ) 913 } 914 915 dt_cols_tuples = { 916 **known_dt_cols_tuples, 917 **pydt_cols_tuples 918 } 919 920 all_dt_cols_tuples = { 921 col: dt_cols_tuples[col] 922 for col in df.columns 923 if col in dt_cols_tuples 924 } 925 if timezone_aware and timezone_naive: 926 return ( 927 list(all_dt_cols_tuples) 928 if not with_tz_precision 929 else all_dt_cols_tuples 930 ) 931 932 known_timezone_aware_dt_cols = [ 933 col 934 for col in known_dt_cols_types 935 if getattr(df[col], 'tz', None) is not None 936 ] 937 timezone_aware_pydt_cols_tuples = { 938 col: (tz, precision) 939 for col, (tz, precision) in pydt_cols_tuples.items() 940 if df.loc[cols_indices[col]][col].tzinfo is not None 941 } 942 timezone_aware_dt_cols_set = set( 943 known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples) 944 ) 945 timezone_aware_cols_tuples = { 946 col: (tz, precision) 947 for col, (tz, precision) in all_dt_cols_tuples.items() 948 if col in timezone_aware_dt_cols_set 949 } 950 timezone_naive_cols_tuples = { 951 col: (tz, precision) 952 for col, (tz, precision) in all_dt_cols_tuples.items() 953 if col not in timezone_aware_dt_cols_set 954 } 955 956 if timezone_aware: 957 return ( 958 list(timezone_aware_cols_tuples) 959 if not with_tz_precision 960 else timezone_aware_cols_tuples 961 ) 962 963 return ( 964 list(timezone_naive_cols_tuples) 965 if not with_tz_precision 966 else timezone_naive_cols_tuples 967 )
Get the columns which contain datetime or Timestamp objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame):
The DataFrame which may contain
datetimeorTimestampobjects. - timezone_aware (bool, default True):
If
True, include timezone-aware datetime columns. - timezone_naive (bool, default True):
If
True, include timezone-naive datetime columns. - with_tz_precision (bool, default False):
If
True, return a dictionary mapping column names to tuples in the form(timezone, precision).
Returns
- A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
- (if
with_tz_precisionisTrue).
970def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 971 """ 972 Return a dictionary mapping datetime columns to specific types strings. 973 974 Parameters 975 ---------- 976 df: pd.DataFrame 977 The DataFrame which may contain datetime columns. 978 979 Returns 980 ------- 981 A dictionary mapping the datetime columns' names to dtype strings 982 (containing timezone and precision metadata). 983 984 Examples 985 -------- 986 >>> from datetime import datetime, timezone 987 >>> import pandas as pd 988 >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]}) 989 >>> get_datetime_cols_types(df) 990 {'dt_tz_aware': 'datetime64[us, UTC]'} 991 >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]}) 992 >>> get_datetime_cols_types(df) 993 {'distant_dt': 'datetime64[us]'} 994 >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)}) 995 >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]') 996 >>> get_datetime_cols_types(df) 997 {'dt_second': 'datetime64[s]'} 998 """ 999 from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS 1000 dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True) 1001 if not dt_cols_tuples: 1002 return {} 1003 1004 return { 1005 col: ( 1006 f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]" 1007 if tz is None 1008 else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]" 1009 ) 1010 for col, (tz, precision) in dt_cols_tuples.items() 1011 }
Return a dictionary mapping datetime columns to specific types strings.
Parameters
- df (pd.DataFrame): The DataFrame which may contain datetime columns.
Returns
- A dictionary mapping the datetime columns' names to dtype strings
- (containing timezone and precision metadata).
Examples
>>> from datetime import datetime, timezone
>>> import pandas as pd
>>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
>>> get_datetime_cols_types(df)
{'dt_tz_aware': 'datetime64[us, UTC]'}
>>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
>>> get_datetime_cols_types(df)
{'distant_dt': 'datetime64[us]'}
>>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
>>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
>>> get_datetime_cols_types(df)
{'dt_second': 'datetime64[s]'}
1014def get_date_cols(df: 'pd.DataFrame') -> List[str]: 1015 """ 1016 Get the `date` columns from a Pandas DataFrame. 1017 1018 Parameters 1019 ---------- 1020 df: pd.DataFrame 1021 The DataFrame which may contain dates. 1022 1023 Returns 1024 ------- 1025 A list of columns to treat as dates. 1026 """ 1027 from meerschaum.utils.dtypes import are_dtypes_equal 1028 if df is None: 1029 return [] 1030 1031 is_dask = 'dask' in df.__module__ 1032 if is_dask: 1033 df = get_first_valid_dask_partition(df) 1034 1035 known_date_cols = [ 1036 col 1037 for col, typ in df.dtypes.items() 1038 if are_dtypes_equal(typ, 'date') 1039 ] 1040 1041 if len(df) == 0: 1042 return known_date_cols 1043 1044 cols_indices = { 1045 col: df[col].first_valid_index() 1046 for col in df.columns 1047 if col not in known_date_cols 1048 } 1049 object_date_cols = [ 1050 col 1051 for col, ix in cols_indices.items() 1052 if ( 1053 ix is not None 1054 and isinstance(df.loc[ix][col], date) 1055 ) 1056 ] 1057 1058 all_date_cols = set(known_date_cols + object_date_cols) 1059 1060 return [ 1061 col 1062 for col in df.columns 1063 if col in all_date_cols 1064 ]
Get the date columns from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain dates.
Returns
- A list of columns to treat as dates.
1067def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 1068 """ 1069 Get the columns which contain bytes strings from a Pandas DataFrame. 1070 1071 Parameters 1072 ---------- 1073 df: pd.DataFrame 1074 The DataFrame which may contain bytes strings. 1075 1076 Returns 1077 ------- 1078 A list of columns to treat as bytes. 1079 """ 1080 if df is None: 1081 return [] 1082 1083 is_dask = 'dask' in df.__module__ 1084 if is_dask: 1085 df = get_first_valid_dask_partition(df) 1086 1087 known_bytes_cols = [ 1088 col 1089 for col, typ in df.dtypes.items() 1090 if str(typ) == 'binary[pyarrow]' 1091 ] 1092 1093 if len(df) == 0: 1094 return known_bytes_cols 1095 1096 cols_indices = { 1097 col: df[col].first_valid_index() 1098 for col in df.columns 1099 if col not in known_bytes_cols 1100 } 1101 object_bytes_cols = [ 1102 col 1103 for col, ix in cols_indices.items() 1104 if ( 1105 ix is not None 1106 and isinstance(df.loc[ix][col], bytes) 1107 ) 1108 ] 1109 1110 all_bytes_cols = set(known_bytes_cols + object_bytes_cols) 1111 1112 return [ 1113 col 1114 for col in df.columns 1115 if col in all_bytes_cols 1116 ]
Get the columns which contain bytes strings from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
- A list of columns to treat as bytes.
1119def get_geometry_cols( 1120 df: 'pd.DataFrame', 1121 with_types_srids: bool = False, 1122) -> Union[List[str], Dict[str, Any]]: 1123 """ 1124 Get the columns which contain shapely objects from a Pandas DataFrame. 1125 1126 Parameters 1127 ---------- 1128 df: pd.DataFrame 1129 The DataFrame which may contain bytes strings. 1130 1131 with_types_srids: bool, default False 1132 If `True`, return a dictionary mapping columns to geometry types and SRIDs. 1133 1134 Returns 1135 ------- 1136 A list of columns to treat as `geometry`. 1137 If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID). 1138 """ 1139 if df is None: 1140 return [] if not with_types_srids else {} 1141 1142 is_dask = 'dask' in df.__module__ 1143 if is_dask: 1144 df = get_first_valid_dask_partition(df) 1145 1146 if len(df) == 0: 1147 return [] if not with_types_srids else {} 1148 1149 cols_indices = { 1150 col: df[col].first_valid_index() 1151 for col in df.columns 1152 } 1153 geo_cols = [ 1154 col 1155 for col, ix in cols_indices.items() 1156 if ( 1157 ix is not None 1158 and 1159 'shapely' in str(type(df.loc[ix][col])) 1160 ) 1161 ] 1162 if not with_types_srids: 1163 return geo_cols 1164 1165 gpd = mrsm.attempt_import('geopandas', lazy=False) 1166 geo_cols_types_srids = {} 1167 for col in geo_cols: 1168 try: 1169 sample_geo_series = gpd.GeoSeries(df[col], crs=None) 1170 geometry_types = { 1171 geom.geom_type 1172 for geom in sample_geo_series 1173 if hasattr(geom, 'geom_type') 1174 } 1175 geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series) 1176 srid = ( 1177 ( 1178 sample_geo_series.crs.sub_crs_list[0].to_epsg() 1179 if sample_geo_series.crs.is_compound 1180 else sample_geo_series.crs.to_epsg() 1181 ) 1182 if sample_geo_series.crs 1183 else 0 1184 ) 1185 geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry' 1186 if geometry_type != 'geometry' and geometry_has_z: 1187 geometry_type = geometry_type + 'Z' 1188 except Exception: 1189 srid = 0 1190 geometry_type = 'geometry' 1191 geo_cols_types_srids[col] = (geometry_type, srid) 1192 1193 return geo_cols_types_srids
Get the columns which contain shapely objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bytes strings.
- with_types_srids (bool, default False):
If
True, return a dictionary mapping columns to geometry types and SRIDs.
Returns
- A list of columns to treat as
geometry. - If
with_types_srids, return a dictionary mapping columns to tuples in the form (type, SRID).
1196def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]: 1197 """ 1198 Return a dtypes dictionary mapping columns to specific geometry types (type, srid). 1199 """ 1200 geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True) 1201 new_cols_types = {} 1202 for col, (geometry_type, srid) in geometry_cols_types_srids.items(): 1203 new_dtype = "geometry" 1204 modifier = "" 1205 if not srid and geometry_type.lower() == 'geometry': 1206 new_cols_types[col] = new_dtype 1207 continue 1208 1209 modifier = "[" 1210 if geometry_type.lower() != 'geometry': 1211 modifier += f"{geometry_type}" 1212 1213 if srid: 1214 if modifier != '[': 1215 modifier += ", " 1216 modifier += f"{srid}" 1217 modifier += "]" 1218 new_cols_types[col] = f"{new_dtype}{modifier}" 1219 return new_cols_types
Return a dtypes dictionary mapping columns to specific geometry types (type, srid).
1222def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]: 1223 """ 1224 Return a dtypes dictionary mapping special columns to their dtypes. 1225 """ 1226 return { 1227 **{col: 'json' for col in get_json_cols(df)}, 1228 **{col: 'uuid' for col in get_uuid_cols(df)}, 1229 **{col: 'bytes' for col in get_bytes_cols(df)}, 1230 **{col: 'bool' for col in get_bool_cols(df)}, 1231 **{col: 'numeric' for col in get_numeric_cols(df)}, 1232 **{col: 'date' for col in get_date_cols(df)}, 1233 **get_datetime_cols_types(df), 1234 **get_geometry_cols_types(df), 1235 }
Return a dtypes dictionary mapping special columns to their dtypes.
1238def enforce_dtypes( 1239 df: 'pd.DataFrame', 1240 dtypes: Dict[str, str], 1241 explicit_dtypes: Optional[Dict[str, str]] = None, 1242 safe_copy: bool = True, 1243 coerce_numeric: bool = False, 1244 coerce_timezone: bool = True, 1245 strip_timezone: bool = False, 1246 debug: bool = False, 1247) -> 'pd.DataFrame': 1248 """ 1249 Enforce the `dtypes` dictionary on a DataFrame. 1250 1251 Parameters 1252 ---------- 1253 df: pd.DataFrame 1254 The DataFrame on which to enforce dtypes. 1255 1256 dtypes: Dict[str, str] 1257 The data types to attempt to enforce on the DataFrame. 1258 1259 explicit_dtypes: Optional[Dict[str, str]], default None 1260 If provided, automatic dtype coersion will respect explicitly configured 1261 dtypes (`int`, `float`, `numeric`). 1262 1263 safe_copy: bool, default True 1264 If `True`, create a copy before comparing and modifying the dataframes. 1265 Setting to `False` may mutate the DataFrames. 1266 See `meerschaum.utils.dataframe.filter_unseen_df`. 1267 1268 coerce_numeric: bool, default False 1269 If `True`, convert float and int collisions to numeric. 1270 1271 coerce_timezone: bool, default True 1272 If `True`, convert datetimes to UTC. 1273 1274 strip_timezone: bool, default False 1275 If `coerce_timezone` and `strip_timezone` are `True`, 1276 remove timezone information from datetimes. 1277 1278 debug: bool, default False 1279 Verbosity toggle. 1280 1281 Returns 1282 ------- 1283 The Pandas DataFrame with the types enforced. 1284 """ 1285 import json 1286 import functools 1287 from meerschaum.utils.debug import dprint 1288 from meerschaum.utils.formatting import pprint 1289 from meerschaum.utils.dtypes import ( 1290 are_dtypes_equal, 1291 to_pandas_dtype, 1292 is_dtype_numeric, 1293 attempt_cast_to_numeric, 1294 attempt_cast_to_uuid, 1295 attempt_cast_to_bytes, 1296 attempt_cast_to_geometry, 1297 coerce_timezone as _coerce_timezone, 1298 get_geometry_type_srid, 1299 ) 1300 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 1301 pandas = mrsm.attempt_import('pandas') 1302 is_dask = 'dask' in df.__module__ 1303 if safe_copy: 1304 df = df.copy() 1305 if len(df.columns) == 0: 1306 if debug: 1307 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 1308 return df 1309 1310 explicit_dtypes = explicit_dtypes or {} 1311 pipe_pandas_dtypes = { 1312 col: to_pandas_dtype(typ) 1313 for col, typ in dtypes.items() 1314 } 1315 json_cols = [ 1316 col 1317 for col, typ in dtypes.items() 1318 if typ == 'json' 1319 ] 1320 numeric_cols = [ 1321 col 1322 for col, typ in dtypes.items() 1323 if typ.startswith('numeric') 1324 ] 1325 geometry_cols_types_srids = { 1326 col: get_geometry_type_srid(typ, default_srid=0) 1327 for col, typ in dtypes.items() 1328 if typ.startswith('geometry') or typ.startswith('geography') 1329 } 1330 uuid_cols = [ 1331 col 1332 for col, typ in dtypes.items() 1333 if typ == 'uuid' 1334 ] 1335 bytes_cols = [ 1336 col 1337 for col, typ in dtypes.items() 1338 if typ == 'bytes' 1339 ] 1340 datetime_cols = [ 1341 col 1342 for col, typ in dtypes.items() 1343 if are_dtypes_equal(typ, 'datetime') 1344 ] 1345 df_numeric_cols = get_numeric_cols(df) 1346 if debug: 1347 dprint("Desired data types:") 1348 pprint(dtypes) 1349 dprint("Data types for incoming DataFrame:") 1350 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 1351 1352 if json_cols and len(df) > 0: 1353 if debug: 1354 dprint(f"Checking columns for JSON encoding: {json_cols}") 1355 for col in json_cols: 1356 if col in df.columns: 1357 try: 1358 df[col] = df[col].apply( 1359 ( 1360 lambda x: ( 1361 json.loads(x) 1362 if isinstance(x, str) 1363 else x 1364 ) 1365 ) 1366 ) 1367 except Exception as e: 1368 if debug: 1369 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 1370 1371 if numeric_cols: 1372 if debug: 1373 dprint(f"Checking for numerics: {numeric_cols}") 1374 for col in numeric_cols: 1375 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 1376 if col in df.columns: 1377 try: 1378 df[col] = df[col].apply( 1379 functools.partial( 1380 attempt_cast_to_numeric, 1381 quantize=True, 1382 precision=precision, 1383 scale=scale, 1384 ) 1385 ) 1386 except Exception as e: 1387 if debug: 1388 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 1389 1390 if uuid_cols: 1391 if debug: 1392 dprint(f"Checking for UUIDs: {uuid_cols}") 1393 for col in uuid_cols: 1394 if col in df.columns: 1395 try: 1396 df[col] = df[col].apply(attempt_cast_to_uuid) 1397 except Exception as e: 1398 if debug: 1399 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1400 1401 if bytes_cols: 1402 if debug: 1403 dprint(f"Checking for bytes: {bytes_cols}") 1404 for col in bytes_cols: 1405 if col in df.columns: 1406 try: 1407 df[col] = df[col].apply(attempt_cast_to_bytes) 1408 except Exception as e: 1409 if debug: 1410 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1411 1412 if datetime_cols and coerce_timezone: 1413 if debug: 1414 dprint(f"Checking for datetime conversion: {datetime_cols}") 1415 for col in datetime_cols: 1416 if col in df.columns: 1417 if not strip_timezone and 'utc' in str(df.dtypes[col]).lower(): 1418 if debug: 1419 dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).") 1420 continue 1421 if strip_timezone and ',' not in str(df.dtypes[col]): 1422 if debug: 1423 dprint( 1424 f"Skip UTC coersion (stripped) for column '{col}' " 1425 f"({str(df[col].dtype)})." 1426 ) 1427 continue 1428 1429 if debug: 1430 dprint( 1431 f"Data type for column '{col}' before timezone coersion: " 1432 f"{str(df[col].dtype)}" 1433 ) 1434 1435 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1436 if debug: 1437 dprint( 1438 f"Data type for column '{col}' after timezone coersion: " 1439 f"{str(df[col].dtype)}" 1440 ) 1441 1442 if geometry_cols_types_srids: 1443 geopandas = mrsm.attempt_import('geopandas') 1444 if debug: 1445 dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}") 1446 parsed_geom_cols = [] 1447 for col in geometry_cols_types_srids: 1448 if col not in df.columns: 1449 continue 1450 try: 1451 df[col] = attempt_cast_to_geometry(df[col]) 1452 parsed_geom_cols.append(col) 1453 except Exception as e: 1454 import traceback 1455 traceback.print_exc() 1456 if debug: 1457 dprint(f"Unable to parse column '{col}' as geometry:\n{e}") 1458 1459 if parsed_geom_cols: 1460 if debug: 1461 dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...") 1462 try: 1463 _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]] 1464 df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid) 1465 for col, (_, srid) in geometry_cols_types_srids.items(): 1466 if srid: 1467 if debug: 1468 dprint(f"Setting '{col}' to SRID '{srid}'...") 1469 _ = df[col].set_crs(srid) 1470 if parsed_geom_cols[0] not in df.columns: 1471 df.rename_geometry(parsed_geom_cols[0], inplace=True) 1472 except (ValueError, TypeError): 1473 import traceback 1474 dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}") 1475 1476 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1477 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1478 if debug: 1479 dprint("Data types match. Exiting enforcement...") 1480 return df 1481 1482 common_dtypes = {} 1483 common_diff_dtypes = {} 1484 for col, typ in pipe_pandas_dtypes.items(): 1485 if col in df_dtypes: 1486 common_dtypes[col] = typ 1487 if not are_dtypes_equal(typ, df_dtypes[col]): 1488 common_diff_dtypes[col] = df_dtypes[col] 1489 1490 if debug: 1491 dprint("Common columns with different dtypes:") 1492 pprint(common_diff_dtypes) 1493 1494 detected_dt_cols = {} 1495 for col, typ in common_diff_dtypes.items(): 1496 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1497 df_dtypes[col] = typ 1498 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1499 for col in detected_dt_cols: 1500 del common_diff_dtypes[col] 1501 1502 if debug: 1503 dprint("Common columns with different dtypes (after dates):") 1504 pprint(common_diff_dtypes) 1505 1506 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1507 if debug: 1508 dprint( 1509 "The incoming DataFrame has mostly the same types, skipping enforcement." 1510 + "The only detected difference was in the following datetime columns." 1511 ) 1512 pprint(detected_dt_cols) 1513 return df 1514 1515 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1516 previous_typ = common_dtypes[col] 1517 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1518 explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float') 1519 explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int') 1520 explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric') 1521 all_nan = ( 1522 df[col].isnull().all() 1523 if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int) 1524 else None 1525 ) 1526 cast_to_numeric = explicitly_numeric or ( 1527 ( 1528 col in df_numeric_cols 1529 or ( 1530 mixed_numeric_types 1531 and not (explicitly_float or explicitly_int) 1532 and not all_nan 1533 and coerce_numeric 1534 ) 1535 ) 1536 ) 1537 1538 if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types): 1539 from meerschaum.utils.formatting import make_header 1540 msg = ( 1541 make_header(f"Coercing column '{col}' to numeric:", left_pad=0) 1542 + "\n" 1543 + f" Previous type: {previous_typ}\n" 1544 + f" Current type: {typ if col not in df_numeric_cols else 'Decimal'}" 1545 + ("\n Column is explicitly numeric." if explicitly_numeric else "") 1546 ) if cast_to_numeric else ( 1547 f"Will not coerce column '{col}' to numeric.\n" 1548 f" Numeric columns in dataframe: {df_numeric_cols}\n" 1549 f" Mixed numeric types: {mixed_numeric_types}\n" 1550 f" Explicitly float: {explicitly_float}\n" 1551 f" Explicitly int: {explicitly_int}\n" 1552 f" All NaN: {all_nan}\n" 1553 f" Coerce numeric: {coerce_numeric}" 1554 ) 1555 dprint(msg) 1556 1557 if cast_to_numeric: 1558 common_dtypes[col] = attempt_cast_to_numeric 1559 common_diff_dtypes[col] = attempt_cast_to_numeric 1560 1561 for d in common_diff_dtypes: 1562 t = common_dtypes[d] 1563 if debug: 1564 dprint(f"Casting column {d} to dtype {t}.") 1565 try: 1566 df[d] = ( 1567 df[d].apply(t) 1568 if callable(t) 1569 else df[d].astype(t) 1570 ) 1571 except Exception as e: 1572 if debug: 1573 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}") 1574 if 'int' in str(t).lower(): 1575 try: 1576 df[d] = df[d].astype('float64').astype(t) 1577 except Exception: 1578 if debug: 1579 dprint(f"Was unable to convert to float then {t}.") 1580 return df
Enforce the dtypes dictionary on a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame on which to enforce dtypes.
- dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
- explicit_dtypes (Optional[Dict[str, str]], default None):
If provided, automatic dtype coersion will respect explicitly configured
dtypes (
int,float,numeric). - safe_copy (bool, default True):
If
True, create a copy before comparing and modifying the dataframes. Setting toFalsemay mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df. - coerce_numeric (bool, default False):
If
True, convert float and int collisions to numeric. - coerce_timezone (bool, default True):
If
True, convert datetimes to UTC. - strip_timezone (bool, default False):
If
coerce_timezoneandstrip_timezoneareTrue, remove timezone information from datetimes. - debug (bool, default False): Verbosity toggle.
Returns
- The Pandas DataFrame with the types enforced.
1583def get_datetime_bound_from_df( 1584 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1585 datetime_column: str, 1586 minimum: bool = True, 1587) -> Union[int, datetime, None]: 1588 """ 1589 Return the minimum or maximum datetime (or integer) from a DataFrame. 1590 1591 Parameters 1592 ---------- 1593 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1594 The DataFrame, list, or dict which contains the range axis. 1595 1596 datetime_column: str 1597 The name of the datetime (or int) column. 1598 1599 minimum: bool 1600 Whether to return the minimum (default) or maximum value. 1601 1602 Returns 1603 ------- 1604 The minimum or maximum datetime value in the dataframe, or `None`. 1605 """ 1606 from meerschaum.utils.dtypes import to_datetime, value_is_null 1607 1608 if df is None: 1609 return None 1610 if not datetime_column: 1611 return None 1612 1613 def compare(a, b): 1614 if a is None: 1615 return b 1616 if b is None: 1617 return a 1618 if minimum: 1619 return a if a < b else b 1620 return a if a > b else b 1621 1622 if isinstance(df, list): 1623 if len(df) == 0: 1624 return None 1625 best_yet = df[0].get(datetime_column, None) 1626 for doc in df: 1627 val = doc.get(datetime_column, None) 1628 best_yet = compare(best_yet, val) 1629 return best_yet 1630 1631 if isinstance(df, dict): 1632 if datetime_column not in df: 1633 return None 1634 best_yet = df[datetime_column][0] 1635 for val in df[datetime_column]: 1636 best_yet = compare(best_yet, val) 1637 return best_yet 1638 1639 if 'DataFrame' in str(type(df)): 1640 from meerschaum.utils.dtypes import are_dtypes_equal 1641 pandas = mrsm.attempt_import('pandas') 1642 is_dask = 'dask' in df.__module__ 1643 1644 if datetime_column not in df.columns: 1645 return None 1646 1647 try: 1648 dt_val = ( 1649 df[datetime_column].min(skipna=True) 1650 if minimum 1651 else df[datetime_column].max(skipna=True) 1652 ) 1653 except Exception: 1654 dt_val = pandas.NA 1655 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1656 dt_val = dt_val.compute() 1657 1658 return ( 1659 to_datetime(dt_val, as_pydatetime=True) 1660 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1661 else (dt_val if not value_is_null(dt_val) else None) 1662 ) 1663 1664 return None
Return the minimum or maximum datetime (or integer) from a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
- datetime_column (str): The name of the datetime (or int) column.
- minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
- The minimum or maximum datetime value in the dataframe, or
None.
1667def get_unique_index_values( 1668 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1669 indices: List[str], 1670) -> Dict[str, List[Any]]: 1671 """ 1672 Return a dictionary of the unique index values in a DataFrame. 1673 1674 Parameters 1675 ---------- 1676 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1677 The dataframe (or list or dict) which contains index values. 1678 1679 indices: List[str] 1680 The list of index columns. 1681 1682 Returns 1683 ------- 1684 A dictionary mapping indices to unique values. 1685 """ 1686 if df is None: 1687 return {} 1688 if 'dataframe' in str(type(df)).lower(): 1689 pandas = mrsm.attempt_import('pandas') 1690 return { 1691 col: list({ 1692 (val if val is not pandas.NA else None) 1693 for val in df[col].unique() 1694 }) 1695 for col in indices 1696 if col in df.columns 1697 } 1698 1699 unique_indices = defaultdict(lambda: set()) 1700 if isinstance(df, list): 1701 for doc in df: 1702 for index in indices: 1703 if index in doc: 1704 unique_indices[index].add(doc[index]) 1705 1706 elif isinstance(df, dict): 1707 for index in indices: 1708 if index in df: 1709 unique_indices[index] = unique_indices[index].union(set(df[index])) 1710 1711 return {key: list(val) for key, val in unique_indices.items()}
Return a dictionary of the unique index values in a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
- indices (List[str]): The list of index columns.
Returns
- A dictionary mapping indices to unique values.
1714def df_is_chunk_generator(df: Any) -> bool: 1715 """ 1716 Determine whether to treat `df` as a chunk generator. 1717 1718 Note this should only be used in a context where generators are expected, 1719 as it will return `True` for any iterable. 1720 1721 Parameters 1722 ---------- 1723 The DataFrame or chunk generator to evaluate. 1724 1725 Returns 1726 ------- 1727 A `bool` indicating whether to treat `df` as a generator. 1728 """ 1729 return ( 1730 not isinstance(df, (dict, list, str)) 1731 and 'DataFrame' not in str(type(df)) 1732 and isinstance(df, (Generator, Iterable, Iterator)) 1733 )
Determine whether to treat df as a chunk generator.
Note this should only be used in a context where generators are expected,
as it will return True for any iterable.
Parameters
- The DataFrame or chunk generator to evaluate.
Returns
- A
boolindicating whether to treatdfas a generator.
1736def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1737 """ 1738 Return the Dask `npartitions` value for a given `chunksize`. 1739 """ 1740 if chunksize == -1: 1741 from meerschaum.config import get_config 1742 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1743 if chunksize is None: 1744 return 1 1745 return -1 * chunksize
Return the Dask npartitions value for a given chunksize.
1748def df_from_literal( 1749 pipe: Optional[mrsm.Pipe] = None, 1750 literal: Optional[str] = None, 1751 debug: bool = False 1752) -> 'pd.DataFrame': 1753 """ 1754 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1755 1756 Parameters 1757 ---------- 1758 pipe: Optional['meerschaum.Pipe'], default None 1759 The pipe which will consume the literal value. 1760 1761 Returns 1762 ------- 1763 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1764 and the literal as the value. 1765 """ 1766 from meerschaum.utils.packages import import_pandas 1767 from meerschaum.utils.warnings import error, warn 1768 from meerschaum.utils.debug import dprint 1769 from meerschaum.utils.dtypes import get_current_timestamp 1770 1771 if pipe is None or literal is None: 1772 error("Please provide a Pipe and a literal value") 1773 1774 dt_col = pipe.columns.get( 1775 'datetime', 1776 mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing') 1777 ) 1778 val_col = pipe.get_val_column(debug=debug) 1779 1780 val = literal 1781 if isinstance(literal, str): 1782 if debug: 1783 dprint(f"Received literal string: '{literal}'") 1784 import ast 1785 try: 1786 val = ast.literal_eval(literal) 1787 except Exception: 1788 warn( 1789 "Failed to parse value from string:\n" + f"{literal}" + 1790 "\n\nWill cast as a string instead."\ 1791 ) 1792 val = literal 1793 1794 now = get_current_timestamp(pipe.precision) 1795 pd = import_pandas() 1796 return pd.DataFrame({dt_col: [now], val_col: [val]})
Construct a dataframe from a literal value, using the pipe's datetime and value column names.
Parameters
- pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
- A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
- and the literal as the value.
1799def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1800 """ 1801 Return the first valid Dask DataFrame partition (if possible). 1802 """ 1803 pdf = None 1804 for partition in ddf.partitions: 1805 try: 1806 pdf = partition.compute() 1807 except Exception: 1808 continue 1809 if len(pdf) > 0: 1810 return pdf 1811 _ = mrsm.attempt_import('partd', lazy=False) 1812 return ddf.compute()
Return the first valid Dask DataFrame partition (if possible).
1815def query_df( 1816 df: 'pd.DataFrame', 1817 params: Optional[Dict[str, Any]] = None, 1818 begin: Union[datetime, int, None] = None, 1819 end: Union[datetime, int, None] = None, 1820 datetime_column: Optional[str] = None, 1821 select_columns: Optional[List[str]] = None, 1822 omit_columns: Optional[List[str]] = None, 1823 inplace: bool = False, 1824 reset_index: bool = False, 1825 coerce_types: bool = False, 1826 debug: bool = False, 1827) -> 'pd.DataFrame': 1828 """ 1829 Query the dataframe with the params dictionary. 1830 1831 Parameters 1832 ---------- 1833 df: pd.DataFrame 1834 The DataFrame to query against. 1835 1836 params: Optional[Dict[str, Any]], default None 1837 The parameters dictionary to use for the query. 1838 1839 begin: Union[datetime, int, None], default None 1840 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1841 greater than or equal to this value. 1842 1843 end: Union[datetime, int, None], default None 1844 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1845 less than this value. 1846 1847 datetime_column: Optional[str], default None 1848 A `datetime_column` must be provided to use `begin` and `end`. 1849 1850 select_columns: Optional[List[str]], default None 1851 If provided, only return these columns. 1852 1853 omit_columns: Optional[List[str]], default None 1854 If provided, do not include these columns in the result. 1855 1856 inplace: bool, default False 1857 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1858 1859 reset_index: bool, default False 1860 If `True`, reset the index in the resulting DataFrame. 1861 1862 coerce_types: bool, default False 1863 If `True`, cast the dataframe and parameters as strings before querying. 1864 1865 Returns 1866 ------- 1867 A Pandas DataFrame query result. 1868 """ 1869 1870 def _process_select_columns(_df): 1871 if not select_columns: 1872 return 1873 for col in list(_df.columns): 1874 if col not in select_columns: 1875 del _df[col] 1876 1877 def _process_omit_columns(_df): 1878 if not omit_columns: 1879 return 1880 for col in list(_df.columns): 1881 if col in omit_columns: 1882 del _df[col] 1883 1884 if not params and not begin and not end: 1885 if not inplace: 1886 df = df.copy() 1887 _process_select_columns(df) 1888 _process_omit_columns(df) 1889 return df 1890 1891 from meerschaum.utils.debug import dprint 1892 from meerschaum.utils.misc import get_in_ex_params 1893 from meerschaum.utils.warnings import warn 1894 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1895 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1896 pandas = mrsm.attempt_import('pandas') 1897 NA = pandas.NA 1898 1899 if params: 1900 proto_in_ex_params = get_in_ex_params(params) 1901 for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items(): 1902 if proto_ex_vals: 1903 coerce_types = True 1904 break 1905 params = params.copy() 1906 for key, val in {k: v for k, v in params.items()}.items(): 1907 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'): 1908 if None in val: 1909 val = [item for item in val if item is not None] + [NA] 1910 params[key] = val 1911 if coerce_types: 1912 params[key] = [str(x) for x in val] 1913 else: 1914 if value_is_null(val): 1915 val = NA 1916 params[key] = NA 1917 if coerce_types: 1918 params[key] = str(val) 1919 1920 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1921 1922 if inplace: 1923 df.fillna(NA, inplace=True) 1924 else: 1925 df = df.infer_objects().fillna(NA) 1926 1927 if isinstance(begin, str): 1928 begin = dateutil_parser.parse(begin) 1929 if isinstance(end, str): 1930 end = dateutil_parser.parse(end) 1931 1932 if begin is not None or end is not None: 1933 if not datetime_column or datetime_column not in df.columns: 1934 warn( 1935 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1936 + "ignoring begin and end...", 1937 ) 1938 begin, end = None, None 1939 1940 if debug: 1941 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1942 1943 if datetime_column and (begin is not None or end is not None): 1944 if debug: 1945 dprint("Checking for datetime column compatability.") 1946 1947 from meerschaum.utils.dtypes import coerce_timezone 1948 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1949 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1950 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1951 1952 if df_is_dt: 1953 df_tz = ( 1954 getattr(df[datetime_column].dt, 'tz', None) 1955 if hasattr(df[datetime_column], 'dt') 1956 else None 1957 ) 1958 1959 if begin_is_int: 1960 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1961 if debug: 1962 dprint(f"`begin` will be cast to '{begin}'.") 1963 if end_is_int: 1964 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1965 if debug: 1966 dprint(f"`end` will be cast to '{end}'.") 1967 1968 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1969 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1970 1971 in_ex_params = get_in_ex_params(params) 1972 1973 masks = [ 1974 ( 1975 (df[datetime_column] >= begin) 1976 if begin is not None and datetime_column 1977 else True 1978 ) & ( 1979 (df[datetime_column] < end) 1980 if end is not None and datetime_column 1981 else True 1982 ) 1983 ] 1984 1985 masks.extend([ 1986 ( 1987 ( 1988 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1989 if in_vals 1990 else True 1991 ) & ( 1992 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1993 if ex_vals 1994 else True 1995 ) 1996 ) 1997 for col, (in_vals, ex_vals) in in_ex_params.items() 1998 if col in df.columns 1999 ]) 2000 query_mask = masks[0] 2001 for mask in masks[1:]: 2002 query_mask = query_mask & mask 2003 2004 original_cols = df.columns 2005 2006 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 2007 ### to allow for `<NA>` values. 2008 bool_cols = [ 2009 col 2010 for col, typ in df.dtypes.items() 2011 if are_dtypes_equal(str(typ), 'bool') 2012 ] 2013 for col in bool_cols: 2014 df[col] = df[col].astype('boolean[pyarrow]') 2015 2016 if not isinstance(query_mask, bool): 2017 df['__mrsm_mask'] = ( 2018 query_mask.astype('boolean[pyarrow]') 2019 if hasattr(query_mask, 'astype') 2020 else query_mask 2021 ) 2022 2023 if inplace: 2024 df.where(query_mask, other=NA, inplace=True) 2025 df.dropna(how='all', inplace=True) 2026 result_df = df 2027 else: 2028 result_df = df.where(query_mask, other=NA) 2029 result_df.dropna(how='all', inplace=True) 2030 2031 else: 2032 result_df = df 2033 2034 if '__mrsm_mask' in df.columns: 2035 del df['__mrsm_mask'] 2036 if '__mrsm_mask' in result_df.columns: 2037 del result_df['__mrsm_mask'] 2038 2039 if reset_index: 2040 result_df.reset_index(drop=True, inplace=True) 2041 2042 result_df = enforce_dtypes( 2043 result_df, 2044 dtypes, 2045 safe_copy=False, 2046 debug=debug, 2047 coerce_numeric=False, 2048 coerce_timezone=False, 2049 ) 2050 2051 if select_columns == ['*']: 2052 select_columns = None 2053 2054 if not select_columns and not omit_columns: 2055 return result_df[original_cols] 2056 2057 _process_select_columns(result_df) 2058 _process_omit_columns(result_df) 2059 2060 return result_df
Query the dataframe with the params dictionary.
Parameters
- df (pd.DataFrame): The DataFrame to query against.
- params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
- begin (Union[datetime, int, None], default None):
If
beginanddatetime_columnare provided, only return rows with a timestamp greater than or equal to this value. - end (Union[datetime, int, None], default None):
If
beginanddatetime_columnare provided, only return rows with a timestamp less than this value. - datetime_column (Optional[str], default None):
A
datetime_columnmust be provided to usebeginandend. - select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
- inplace (bool, default False):
If
True, modify the DataFrame inplace rather than creating a new DataFrame. - reset_index (bool, default False):
If
True, reset the index in the resulting DataFrame. - coerce_types (bool, default False):
If
True, cast the dataframe and parameters as strings before querying.
Returns
- A Pandas DataFrame query result.
2063def to_json( 2064 df: 'pd.DataFrame', 2065 safe_copy: bool = True, 2066 orient: str = 'records', 2067 date_format: str = 'iso', 2068 date_unit: str = 'us', 2069 double_precision: int = 15, 2070 geometry_format: str = 'geojson', 2071 **kwargs: Any 2072) -> str: 2073 """ 2074 Serialize the given dataframe as a JSON string. 2075 2076 Parameters 2077 ---------- 2078 df: pd.DataFrame 2079 The DataFrame to be serialized. 2080 2081 safe_copy: bool, default True 2082 If `False`, modify the DataFrame inplace. 2083 2084 date_format: str, default 'iso' 2085 The default format for timestamps. 2086 2087 date_unit: str, default 'us' 2088 The precision of the timestamps. 2089 2090 double_precision: int, default 15 2091 The number of decimal places to use when encoding floating point values (maximum 15). 2092 2093 geometry_format: str, default 'geojson' 2094 The serialization format for geometry data. 2095 Accepted values are `geojson`, `wkb_hex`, and `wkt`. 2096 2097 Returns 2098 ------- 2099 A JSON string. 2100 """ 2101 import warnings 2102 import functools 2103 from meerschaum.utils.packages import import_pandas 2104 from meerschaum.utils.dtypes import ( 2105 serialize_bytes, 2106 serialize_decimal, 2107 serialize_geometry, 2108 ) 2109 pd = import_pandas() 2110 uuid_cols = get_uuid_cols(df) 2111 bytes_cols = get_bytes_cols(df) 2112 numeric_cols = get_numeric_cols(df) 2113 geometry_cols = get_geometry_cols(df) 2114 geometry_cols_srids = { 2115 col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0') 2116 for col in geometry_cols 2117 } if 'geodataframe' in str(type(df)).lower() else {} 2118 if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols): 2119 df = df.copy() 2120 if 'geodataframe' in str(type(df)).lower(): 2121 geometry_data = { 2122 col: df[col] 2123 for col in geometry_cols 2124 } 2125 df = pd.DataFrame({ 2126 col: df[col] 2127 for col in df.columns 2128 if col not in geometry_cols 2129 }) 2130 for col in geometry_cols: 2131 df[col] = pd.Series(ob for ob in geometry_data[col]) 2132 for col in uuid_cols: 2133 df[col] = df[col].astype(str) 2134 for col in bytes_cols: 2135 df[col] = df[col].apply(serialize_bytes) 2136 for col in numeric_cols: 2137 df[col] = df[col].apply(serialize_decimal) 2138 with warnings.catch_warnings(): 2139 warnings.simplefilter("ignore") 2140 for col in geometry_cols: 2141 srid = geometry_cols_srids.get(col, None) or None 2142 df[col] = pd.Series( 2143 serialize_geometry(val, geometry_format=geometry_format, srid=srid) 2144 for val in df[col] 2145 ) 2146 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 2147 date_format=date_format, 2148 date_unit=date_unit, 2149 double_precision=double_precision, 2150 orient=orient, 2151 **kwargs 2152 )
Serialize the given dataframe as a JSON string.
Parameters
- df (pd.DataFrame): The DataFrame to be serialized.
- safe_copy (bool, default True):
If
False, modify the DataFrame inplace. - date_format (str, default 'iso'): The default format for timestamps.
- date_unit (str, default 'us'): The precision of the timestamps.
- double_precision (int, default 15): The number of decimal places to use when encoding floating point values (maximum 15).
- geometry_format (str, default 'geojson'):
The serialization format for geometry data.
Accepted values are
geojson,wkb_hex, andwkt.
Returns
- A JSON string.
2155def to_simple_lines(df: 'pd.DataFrame') -> str: 2156 """ 2157 Serialize a Pandas Dataframe as lines of simple dictionaries. 2158 2159 Parameters 2160 ---------- 2161 df: pd.DataFrame 2162 The dataframe to serialize into simple lines text. 2163 2164 Returns 2165 ------- 2166 A string of simple line dictionaries joined by newlines. 2167 """ 2168 from meerschaum.utils.misc import to_simple_dict 2169 if df is None or len(df) == 0: 2170 return '' 2171 2172 docs = df.to_dict(orient='records') 2173 return '\n'.join(to_simple_dict(doc) for doc in docs)
Serialize a Pandas Dataframe as lines of simple dictionaries.
Parameters
- df (pd.DataFrame): The dataframe to serialize into simple lines text.
Returns
- A string of simple line dictionaries joined by newlines.
2176def parse_simple_lines(data: str) -> 'pd.DataFrame': 2177 """ 2178 Parse simple lines text into a DataFrame. 2179 2180 Parameters 2181 ---------- 2182 data: str 2183 The simple lines text to parse into a DataFrame. 2184 2185 Returns 2186 ------- 2187 A dataframe containing the rows serialized in `data`. 2188 """ 2189 from meerschaum.utils.misc import string_to_dict 2190 from meerschaum.utils.packages import import_pandas 2191 pd = import_pandas() 2192 lines = data.splitlines() 2193 try: 2194 docs = [string_to_dict(line) for line in lines] 2195 df = pd.DataFrame(docs) 2196 except Exception: 2197 df = None 2198 2199 if df is None: 2200 raise ValueError("Cannot parse simple lines into a dataframe.") 2201 2202 return df
Parse simple lines text into a DataFrame.
Parameters
- data (str): The simple lines text to parse into a DataFrame.
Returns
- A dataframe containing the rows serialized in
data.