meerschaum.utils.dtypes
Utility functions for working with data types.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with data types. 7""" 8 9import traceback 10import json 11import uuid 12import time 13import struct 14from datetime import timezone, datetime, date, timedelta 15from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP 16 17import meerschaum as mrsm 18from meerschaum.utils.typing import Dict, Union, Any, Optional, Tuple 19from meerschaum.utils.warnings import warn 20from meerschaum._internal.static import STATIC_CONFIG as _STATIC_CONFIG 21 22MRSM_ALIAS_DTYPES: Dict[str, str] = { 23 'decimal': 'numeric', 24 'Decimal': 'numeric', 25 'number': 'numeric', 26 'jsonl': 'json', 27 'JSON': 'json', 28 'binary': 'bytes', 29 'blob': 'bytes', 30 'varbinary': 'bytes', 31 'bytea': 'bytes', 32 'guid': 'uuid', 33 'UUID': 'uuid', 34 'geom': 'geometry', 35 'geog': 'geography', 36 'boolean': 'bool', 37 'day': 'date', 38} 39MRSM_PD_DTYPES: Dict[Union[str, None], str] = { 40 'json': 'object', 41 'numeric': 'object', 42 'geometry': 'object', 43 'geography': 'object', 44 'uuid': 'object', 45 'date': 'date32[day][pyarrow]', 46 'datetime': 'datetime64[us, UTC]', 47 'bool': 'bool[pyarrow]', 48 'int': 'int64[pyarrow]', 49 'int8': 'int8[pyarrow]', 50 'int16': 'int16[pyarrow]', 51 'int32': 'int32[pyarrow]', 52 'int64': 'int64[pyarrow]', 53 'str': 'string', 54 'bytes': 'binary[pyarrow]', 55 None: 'object', 56} 57 58MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = { 59 'nanosecond': 1_000_000_000, 60 'microsecond': 1_000_000, 61 'millisecond': 1000, 62 'second': 1, 63 'minute': (1 / 60), 64 'hour': (1 / 3600), 65 'day': (1 / 86400), 66} 67 68MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = { 69 'ns': 'nanosecond', 70 'us': 'microsecond', 71 'ms': 'millisecond', 72 's': 'second', 73 'sec': 'second', 74 'm': 'minute', 75 'min': 'minute', 76 'h': 'hour', 77 'hr': 'hour', 78 'd': 'day', 79 'D': 'day', 80} 81MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = { 82 'nanosecond': 'ns', 83 'microsecond': 'us', 84 'millisecond': 'ms', 85 'second': 's', 86 'minute': 'min', 87 'hour': 'hr', 88 'day': 'D', 89} 90 91 92def to_pandas_dtype(dtype: str) -> str: 93 """ 94 Cast a supported Meerschaum dtype to a Pandas dtype. 95 """ 96 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 97 if known_dtype is not None: 98 return known_dtype 99 100 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 101 if alias_dtype is not None: 102 return MRSM_PD_DTYPES[alias_dtype] 103 104 if dtype.startswith('numeric'): 105 return MRSM_PD_DTYPES['numeric'] 106 107 if dtype.startswith('geometry'): 108 return MRSM_PD_DTYPES['geometry'] 109 110 if dtype.startswith('geography'): 111 return MRSM_PD_DTYPES['geography'] 112 113 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 114 ### treat it as a SQL db type. 115 if dtype.split(' ')[0].isupper(): 116 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 117 return get_pd_type_from_db_type(dtype) 118 119 from meerschaum.utils.packages import attempt_import 120 _ = attempt_import('pyarrow', lazy=False) 121 pandas = attempt_import('pandas', lazy=False) 122 123 try: 124 return str(pandas.api.types.pandas_dtype(dtype)) 125 except Exception: 126 warn( 127 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 128 + f"{traceback.format_exc()}", 129 stack=False, 130 ) 131 return 'object' 132 133 134def are_dtypes_equal( 135 ldtype: Union[str, Dict[str, str]], 136 rdtype: Union[str, Dict[str, str]], 137) -> bool: 138 """ 139 Determine whether two dtype strings may be considered 140 equivalent to avoid unnecessary conversions. 141 142 Parameters 143 ---------- 144 ldtype: Union[str, Dict[str, str]] 145 The left dtype to compare. 146 May also provide a dtypes dictionary. 147 148 rdtype: Union[str, Dict[str, str]] 149 The right dtype to compare. 150 May also provide a dtypes dictionary. 151 152 Returns 153 ------- 154 A `bool` indicating whether the two dtypes are to be considered equivalent. 155 """ 156 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 157 lkeys = sorted([str(k) for k in ldtype.keys()]) 158 rkeys = sorted([str(k) for k in rdtype.keys()]) 159 for lkey, rkey in zip(lkeys, rkeys): 160 if lkey != rkey: 161 return False 162 ltype = ldtype[lkey] 163 rtype = rdtype[rkey] 164 if not are_dtypes_equal(ltype, rtype): 165 return False 166 return True 167 168 try: 169 if ldtype == rdtype: 170 return True 171 except Exception: 172 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 173 return False 174 175 ### Sometimes pandas dtype objects are passed. 176 ldtype = str(ldtype).split('[', maxsplit=1)[0] 177 rdtype = str(rdtype).split('[', maxsplit=1)[0] 178 179 if ldtype in MRSM_ALIAS_DTYPES: 180 ldtype = MRSM_ALIAS_DTYPES[ldtype] 181 182 if rdtype in MRSM_ALIAS_DTYPES: 183 rdtype = MRSM_ALIAS_DTYPES[rdtype] 184 185 json_dtypes = ('json', 'object') 186 if ldtype in json_dtypes and rdtype in json_dtypes: 187 return True 188 189 numeric_dtypes = ('numeric', 'decimal', 'object') 190 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 191 return True 192 193 uuid_dtypes = ('uuid', 'object') 194 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 195 return True 196 197 bytes_dtypes = ('bytes', 'object', 'binary') 198 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 199 return True 200 201 geometry_dtypes = ('geometry', 'object', 'geography') 202 if ldtype in geometry_dtypes and rdtype in geometry_dtypes: 203 return True 204 205 if ldtype.lower() == rdtype.lower(): 206 return True 207 208 datetime_dtypes = ('datetime', 'timestamp') 209 ldtype_found_dt_prefix = False 210 rdtype_found_dt_prefix = False 211 for dt_prefix in datetime_dtypes: 212 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 213 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 214 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 215 return True 216 217 string_dtypes = ('str', 'string', 'object') 218 if ldtype in string_dtypes and rdtype in string_dtypes: 219 return True 220 221 int_dtypes = ( 222 'int', 'int64', 'int32', 'int16', 'int8', 223 'uint', 'uint64', 'uint32', 'uint16', 'uint8', 224 ) 225 int_substrings = ('int',) 226 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 227 return True 228 for substring in int_substrings: 229 if substring in ldtype.lower() and substring in rdtype.lower(): 230 return True 231 232 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 233 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 234 return True 235 236 bool_dtypes = ('bool', 'boolean') 237 if ldtype in bool_dtypes and rdtype in bool_dtypes: 238 return True 239 240 date_dtypes = ( 241 'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]', 242 'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]', 243 ) 244 if ldtype in date_dtypes and rdtype in date_dtypes: 245 return True 246 247 return False 248 249 250def is_dtype_numeric(dtype: str) -> bool: 251 """ 252 Determine whether a given `dtype` string 253 should be considered compatible with the Meerschaum dtype `numeric`. 254 255 Parameters 256 ---------- 257 dtype: str 258 The pandas-like dtype string. 259 260 Returns 261 ------- 262 A bool indicating the dtype is compatible with `numeric`. 263 """ 264 dtype_lower = dtype.lower() 265 266 acceptable_substrings = ('numeric', 'float', 'double', 'int') 267 for substring in acceptable_substrings: 268 if substring in dtype_lower: 269 return True 270 271 return False 272 273 274def attempt_cast_to_numeric( 275 value: Any, 276 quantize: bool = False, 277 precision: Optional[int] = None, 278 scale: Optional[int] = None, 279)-> Any: 280 """ 281 Given a value, attempt to coerce it into a numeric (Decimal). 282 283 Parameters 284 ---------- 285 value: Any 286 The value to be cast to a Decimal. 287 288 quantize: bool, default False 289 If `True`, quantize the decimal to the specified precision and scale. 290 291 precision: Optional[int], default None 292 If `quantize` is `True`, use this precision. 293 294 scale: Optional[int], default None 295 If `quantize` is `True`, use this scale. 296 297 Returns 298 ------- 299 A `Decimal` if possible, or `value`. 300 """ 301 if isinstance(value, Decimal): 302 if quantize and precision and scale: 303 return quantize_decimal(value, precision, scale) 304 return value 305 try: 306 if value_is_null(value): 307 return Decimal('NaN') 308 309 dec = Decimal(str(value)) 310 if not quantize or not precision or not scale: 311 return dec 312 return quantize_decimal(dec, precision, scale) 313 except Exception: 314 return value 315 316 317def attempt_cast_to_uuid(value: Any) -> Any: 318 """ 319 Given a value, attempt to coerce it into a UUID (`uuid4`). 320 """ 321 if isinstance(value, uuid.UUID): 322 return value 323 try: 324 return ( 325 uuid.UUID(str(value)) 326 if not value_is_null(value) 327 else None 328 ) 329 except Exception: 330 return value 331 332 333def attempt_cast_to_bytes(value: Any) -> Any: 334 """ 335 Given a value, attempt to coerce it into a bytestring. 336 """ 337 if isinstance(value, bytes): 338 return value 339 try: 340 return ( 341 deserialize_bytes_string(str(value)) 342 if not value_is_null(value) 343 else None 344 ) 345 except Exception: 346 return value 347 348 349def attempt_cast_to_geometry(value: Any) -> Any: 350 """ 351 Given a value, attempt to coerce it into a `shapely` (`geometry`) object. 352 """ 353 typ = str(type(value)) 354 if 'pandas' in typ and 'Series' in typ: 355 if 'GeoSeries' in typ: 356 return value 357 358 gpd = mrsm.attempt_import('geopandas', lazy=False) 359 if len(value) == 0: 360 return gpd.GeoSeries([]) 361 362 ix = value.first_valid_index() 363 if ix is None: 364 try: 365 return gpd.GeoSeries(value) 366 except Exception: 367 traceback.print_exc() 368 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 369 370 sample_val = value[ix] 371 sample_typ = str(type(sample_val)) 372 if 'shapely' in sample_typ: 373 try: 374 return gpd.GeoSeries(value) 375 except Exception: 376 traceback.print_exc() 377 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 378 379 sample_is_gpkg = geometry_is_gpkg(sample_val) 380 if sample_is_gpkg: 381 try: 382 value = value.apply(lambda x: gpkg_wkb_to_wkb(x)[0]) 383 except Exception: 384 traceback.print_exc() 385 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 386 387 sample_is_wkt = geometry_is_wkt(sample_val) if not sample_is_gpkg else False 388 try: 389 return ( 390 gpd.GeoSeries.from_wkt(value) 391 if sample_is_wkt 392 else gpd.GeoSeries.from_wkb(value) 393 ) 394 except Exception: 395 traceback.print_exc() 396 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 397 398 if 'shapely' in typ: 399 return value 400 401 shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import( 402 'shapely', 403 'shapely.wkt', 404 'shapely.wkb', 405 lazy=False, 406 ) 407 408 if isinstance(value, (dict, list)): 409 try: 410 return shapely.from_geojson(json.dumps(value)) 411 except Exception: 412 return value 413 414 value_is_gpkg = geometry_is_gpkg(value) 415 if value_is_gpkg: 416 try: 417 wkb_data, _, _ = gpkg_wkb_to_wkb(value) 418 return shapely_wkb.loads(wkb_data) 419 except Exception: 420 return value 421 422 value_is_wkt = geometry_is_wkt(value) 423 if value_is_wkt is None: 424 return value 425 426 try: 427 return ( 428 shapely_wkt.loads(value) 429 if value_is_wkt 430 else shapely_wkb.loads(value) 431 ) 432 except Exception: 433 pass 434 435 return value 436 437 438def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]: 439 """ 440 Determine whether an input value should be treated as WKT or WKB geometry data. 441 442 Parameters 443 ---------- 444 value: Union[str, bytes] 445 The input data to be parsed into geometry data. 446 447 Returns 448 ------- 449 A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB). 450 Return `None` if `value` should be parsed as neither. 451 """ 452 import re 453 if not isinstance(value, (str, bytes)): 454 return None 455 456 if isinstance(value, bytes): 457 return False 458 459 wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$' 460 if re.match(wkt_pattern, value, re.IGNORECASE): 461 return True 462 463 if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0: 464 return False 465 466 return None 467 468 469def geometry_is_gpkg(value: bytes) -> bool: 470 """ 471 Return whether the input `value` is formatted as GeoPackage WKB. 472 """ 473 if not isinstance(value, bytes) or len(value) < 2: 474 return False 475 476 return value[0:2] == b'GP' 477 478def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]: 479 """ 480 Converts GeoPackage WKB to standard WKB by removing the header. 481 482 Parameters 483 ---------- 484 gpkg_wkb_bytes: bytes 485 The GeoPackage WKB byte string. 486 487 Returns 488 ------- 489 A tuple containing the standard WKB bytes, SRID, and flags. 490 """ 491 magic_number = gpkg_wkb_bytes[0:2] 492 if magic_number != b'GP': 493 raise ValueError("Invalid GeoPackage WKB header: missing magic number.") 494 495 try: 496 header = gpkg_wkb_bytes[0:8] 497 header_vals = struct.unpack('<ccBBi', header) 498 flags = header_vals[-2] 499 srid = header_vals[-1] 500 except struct.error: 501 header = gpkg_wkb_bytes[0:6] 502 header_vals = struct.unpack('<ccBBh', header) 503 flags = header_vals[-2] 504 srid = header_vals[-1] 505 506 envelope_type = (flags >> 1) & 0x07 507 envelope_sizes = { 508 0: 0, 509 1: 32, 510 2: 48, 511 3: 48, 512 4: 64, 513 } 514 header_length = 8 + envelope_sizes.get(envelope_type, 0) 515 standard_wkb_bytes = gpkg_wkb_bytes[header_length:] 516 return standard_wkb_bytes, srid, flags 517 518 519def value_is_null(value: Any) -> bool: 520 """ 521 Determine if a value is a null-like string. 522 """ 523 return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>') 524 525 526def none_if_null(value: Any) -> Any: 527 """ 528 Return `None` if a value is a null-like string. 529 """ 530 return (None if value_is_null(value) else value) 531 532 533def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 534 """ 535 Quantize a given `Decimal` to a known scale and precision. 536 537 Parameters 538 ---------- 539 x: Decimal 540 The `Decimal` to be quantized. 541 542 precision: int 543 The total number of significant digits. 544 545 scale: int 546 The number of significant digits after the decimal point. 547 548 Returns 549 ------- 550 A `Decimal` quantized to the specified scale and precision. 551 """ 552 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 553 try: 554 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 555 except InvalidOperation: 556 pass 557 558 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.") 559 560 561def serialize_decimal( 562 x: Any, 563 quantize: bool = False, 564 precision: Optional[int] = None, 565 scale: Optional[int] = None, 566) -> Any: 567 """ 568 Return a quantized string of an input decimal. 569 570 Parameters 571 ---------- 572 x: Any 573 The potential decimal to be serialized. 574 575 quantize: bool, default False 576 If `True`, quantize the incoming Decimal to the specified scale and precision 577 before serialization. 578 579 precision: Optional[int], default None 580 The precision of the decimal to be quantized. 581 582 scale: Optional[int], default None 583 The scale of the decimal to be quantized. 584 585 Returns 586 ------- 587 A string of the input decimal or the input if not a Decimal. 588 """ 589 if not isinstance(x, Decimal): 590 return x 591 592 if value_is_null(x): 593 return None 594 595 if quantize and scale and precision: 596 x = quantize_decimal(x, precision, scale) 597 598 return f"{x:f}" 599 600 601def coerce_timezone( 602 dt: Any, 603 strip_utc: bool = False, 604) -> Any: 605 """ 606 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 607 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 608 """ 609 if dt is None: 610 return None 611 612 if isinstance(dt, int): 613 return dt 614 615 if isinstance(dt, str): 616 dateutil_parser = mrsm.attempt_import('dateutil.parser') 617 try: 618 dt = dateutil_parser.parse(dt) 619 except Exception: 620 return dt 621 622 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 623 if dt_is_series: 624 pandas = mrsm.attempt_import('pandas', lazy=False) 625 626 if ( 627 pandas.api.types.is_datetime64_any_dtype(dt) and ( 628 (dt.dt.tz is not None and not strip_utc) 629 or 630 (dt.dt.tz is None and strip_utc) 631 ) 632 ): 633 return dt 634 635 dt_series = to_datetime(dt, coerce_utc=False) 636 if dt_series.dt.tz is None: 637 dt_series = dt_series.dt.tz_localize(timezone.utc) 638 if strip_utc: 639 try: 640 if dt_series.dt.tz is not None: 641 dt_series = dt_series.dt.tz_localize(None) 642 except Exception: 643 pass 644 645 return dt_series 646 647 if dt.tzinfo is None: 648 if strip_utc: 649 return dt 650 return dt.replace(tzinfo=timezone.utc) 651 652 utc_dt = dt.astimezone(timezone.utc) 653 if strip_utc: 654 return utc_dt.replace(tzinfo=None) 655 return utc_dt 656 657 658def to_datetime( 659 dt_val: Any, 660 as_pydatetime: bool = False, 661 coerce_utc: bool = True, 662 precision_unit: Optional[str] = None, 663) -> Any: 664 """ 665 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 666 667 Parameters 668 ---------- 669 dt_val: Any 670 The value to coerce to Pandas Timestamps. 671 672 as_pydatetime: bool, default False 673 If `True`, return a Python datetime object. 674 675 coerce_utc: bool, default True 676 If `True`, ensure the value has UTC tzinfo. 677 678 precision_unit: Optional[str], default None 679 If provided, enforce the provided precision unit. 680 """ 681 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 682 is_dask = 'dask' in getattr(dt_val, '__module__', '') 683 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 684 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 685 pd = pandas if dd is None else dd 686 enforce_precision = precision_unit is not None 687 precision_unit = precision_unit or 'microsecond' 688 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 689 precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None) 690 if not precision_abbreviation: 691 raise ValueError(f"Invalid precision '{precision_unit}'.") 692 693 def parse(x: Any) -> Any: 694 try: 695 return dateutil_parser.parse(x) 696 except Exception: 697 return x 698 699 def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool: 700 dtype_check_against = ( 701 f"datetime64[{precision_abbreviation}, UTC]" 702 if with_utc 703 else f"datetime64[{precision_abbreviation}]" 704 ) 705 return ( 706 dtype_to_check == dtype_check_against 707 if enforce_precision 708 else ( 709 dtype_to_check.startswith('datetime64[') 710 and ( 711 ('utc' in dtype_to_check.lower()) 712 if with_utc 713 else ('utc' not in dtype_to_check.lower()) 714 ) 715 ) 716 ) 717 718 if isinstance(dt_val, pd.Timestamp): 719 dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime() 720 return ( 721 coerce_timezone(dt_val_to_return) 722 if coerce_utc 723 else dt_val_to_return 724 ) 725 726 if dt_is_series: 727 changed_tz = False 728 original_tz = None 729 dtype = str(getattr(dt_val, 'dtype', 'object')) 730 if ( 731 are_dtypes_equal(dtype, 'datetime') 732 and 'utc' not in dtype.lower() 733 and hasattr(dt_val, 'dt') 734 ): 735 original_tz = dt_val.dt.tz 736 dt_val = dt_val.dt.tz_localize(timezone.utc) 737 changed_tz = True 738 dtype = str(getattr(dt_val, 'dtype', 'object')) 739 try: 740 new_dt_series = ( 741 dt_val 742 if check_dtype(dtype, with_utc=True) 743 else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]") 744 ) 745 except pd.errors.OutOfBoundsDatetime: 746 try: 747 next_precision = get_next_precision_unit(true_precision_unit) 748 next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision] 749 new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]") 750 except Exception: 751 new_dt_series = None 752 except ValueError: 753 new_dt_series = None 754 except TypeError: 755 try: 756 new_dt_series = ( 757 new_dt_series 758 if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False) 759 else dt_val.astype(f"datetime64[{precision_abbreviation}]") 760 ) 761 except Exception: 762 new_dt_series = None 763 764 if new_dt_series is None: 765 new_dt_series = dt_val.apply(lambda x: parse(str(x))) 766 767 if coerce_utc: 768 return coerce_timezone(new_dt_series) 769 770 if changed_tz: 771 new_dt_series = new_dt_series.dt.tz_localize(original_tz) 772 return new_dt_series 773 774 try: 775 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 776 if new_dt_val.unit != precision_abbreviation: 777 new_dt_val = new_dt_val.as_unit(precision_abbreviation) 778 if as_pydatetime: 779 return new_dt_val.to_pydatetime() 780 return new_dt_val 781 except (pd.errors.OutOfBoundsDatetime, ValueError): 782 pass 783 784 new_dt_val = parse(dt_val) 785 if not coerce_utc: 786 return new_dt_val 787 return coerce_timezone(new_dt_val) 788 789 790def serialize_bytes(data: bytes) -> str: 791 """ 792 Return the given bytes as a base64-encoded string. 793 """ 794 import base64 795 if not isinstance(data, bytes) and value_is_null(data): 796 return data 797 return base64.b64encode(data).decode('utf-8') 798 799 800def serialize_geometry( 801 geom: Any, 802 geometry_format: str = 'wkb_hex', 803 srid: Optional[int] = None, 804) -> Union[str, Dict[str, Any], bytes, None]: 805 """ 806 Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 807 808 Parameters 809 ---------- 810 geom: Any 811 The potential geometry data to be serialized. 812 813 geometry_format: str, default 'wkb_hex' 814 The serialization format for geometry data. 815 Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`. 816 817 srid: Optional[int], default None 818 If provided, use this as the source CRS when serializing to GeoJSON. 819 820 Returns 821 ------- 822 A string containing the geometry data, or bytes, or a dictionary, or None. 823 """ 824 if value_is_null(geom): 825 return None 826 827 shapely, shapely_ops, pyproj, np = mrsm.attempt_import( 828 'shapely', 'shapely.ops', 'pyproj', 'numpy', 829 lazy=False, 830 ) 831 if geometry_format == 'geojson': 832 if srid: 833 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True) 834 geom = shapely_ops.transform(transformer.transform, geom) 835 geojson_str = shapely.to_geojson(geom) 836 return json.loads(geojson_str) 837 838 if not hasattr(geom, 'wkb_hex'): 839 return str(geom) 840 841 byte_order = 1 if np.little_endian else 0 842 843 if geometry_format.startswith("wkb"): 844 return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True) 845 elif geometry_format == 'gpkg_wkb': 846 wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order) 847 flags = ( 848 ((byte_order & 0x01) | (0x20)) 849 if geom.is_empty 850 else (byte_order & 0x01) 851 ) 852 srid_val = srid or -1 853 header = struct.pack( 854 '<ccBBi', 855 b'G', b'P', 856 0, 857 flags, 858 srid_val 859 ) 860 return header + wkb_data 861 862 return shapely.to_wkt(geom) 863 864 865def deserialize_geometry(geom_wkb: Union[str, bytes]): 866 """ 867 Deserialize a WKB string into a shapely geometry object. 868 """ 869 shapely = mrsm.attempt_import('shapely', lazy=False) 870 return shapely.wkb.loads(geom_wkb) 871 872 873def project_geometry(geom, srid: int, to_srid: int = 4326): 874 """ 875 Project a shapely geometry object to a new CRS (SRID). 876 """ 877 pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False) 878 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True) 879 return shapely_ops.transform(transformer.transform, geom) 880 881 882def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]: 883 """ 884 Given a serialized ASCII string of bytes data, return the original bytes. 885 The input data may either be base64- or hex-encoded. 886 887 Parameters 888 ---------- 889 data: Optional[str] 890 The string to be deserialized into bytes. 891 May be base64- or hex-encoded (prefixed with `'\\x'`). 892 893 force_hex: bool = False 894 If `True`, treat the input string as hex-encoded. 895 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 896 This will still strip the leading `'\\x'` prefix if present. 897 898 Returns 899 ------- 900 The original bytes used to produce the encoded string `data`. 901 """ 902 if not isinstance(data, str) and value_is_null(data): 903 return data 904 905 import binascii 906 import base64 907 908 is_hex = force_hex or data.startswith('\\x') 909 910 if is_hex: 911 if data.startswith('\\x'): 912 data = data[2:] 913 return binascii.unhexlify(data) 914 915 return base64.b64decode(data) 916 917 918def deserialize_base64(data: str) -> bytes: 919 """ 920 Return the original bytestring from the given base64-encoded string. 921 """ 922 import base64 923 return base64.b64decode(data) 924 925 926def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]: 927 """ 928 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 929 """ 930 import binascii 931 if not isinstance(data, bytes) and value_is_null(data): 932 return data 933 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8') 934 935 936def serialize_datetime(dt: datetime) -> Union[str, None]: 937 """ 938 Serialize a datetime object into JSON (ISO format string). 939 940 Examples 941 -------- 942 >>> import json 943 >>> from datetime import datetime 944 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 945 '{"a": "2022-01-01T00:00:00Z"}' 946 947 """ 948 if not hasattr(dt, 'isoformat'): 949 return None 950 951 tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else '' 952 return dt.isoformat() + tz_suffix 953 954 955def serialize_date(d: date) -> Union[str, None]: 956 """ 957 Serialize a date object into its ISO representation. 958 """ 959 return d.isoformat() if hasattr(d, 'isoformat') else None 960 961 962def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]: 963 """ 964 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 965 966 Parameters 967 ---------- 968 x: Any 969 The value to serialize. 970 971 default_to_str: bool, default True 972 If `True`, return a string of `x` if x is not a designated type. 973 Otherwise return x. 974 975 Returns 976 ------- 977 A serialized version of x, or x. 978 """ 979 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 980 return x.meta 981 982 if hasattr(x, 'tzinfo'): 983 return serialize_datetime(x) 984 985 if hasattr(x, 'isoformat'): 986 return serialize_date(x) 987 988 if isinstance(x, bytes): 989 return serialize_bytes(x) 990 991 if isinstance(x, Decimal): 992 return serialize_decimal(x) 993 994 if 'shapely' in str(type(x)): 995 return serialize_geometry(x) 996 997 if value_is_null(x): 998 return None 999 1000 if isinstance(x, (dict, list, tuple)): 1001 return json.dumps(x, default=json_serialize_value, separators=(',', ':')) 1002 1003 return str(x) if default_to_str else x 1004 1005 1006def get_geometry_type_srid( 1007 dtype: str = 'geometry', 1008 default_type: str = 'geometry', 1009 default_srid: Union[int, str] = 0, 1010) -> Tuple[str, Union[int, str, None]]: 1011 """ 1012 Given the specified geometry `dtype`, return a tuple in the form (type, SRID). 1013 1014 Parameters 1015 ---------- 1016 dtype: Optional[str], default None 1017 Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`). 1018 You may specify a supported `shapely` geometry type and an SRID in the dtype modifier: 1019 1020 - `Point` 1021 - `LineString` 1022 - `LinearRing` 1023 - `Polygon` 1024 - `MultiPoint` 1025 - `MultiLineString` 1026 - `MultiPolygon` 1027 - `GeometryCollection` 1028 1029 Returns 1030 ------- 1031 A tuple in the form (type, SRID). 1032 Defaults to `(default_type, default_srid)`. 1033 1034 Examples 1035 -------- 1036 >>> from meerschaum.utils.dtypes import get_geometry_type_srid 1037 >>> get_geometry_type_srid() 1038 ('geometry', 4326) 1039 >>> get_geometry_type_srid('geometry[]') 1040 ('geometry', 4326) 1041 >>> get_geometry_type_srid('geometry[Point, 0]') 1042 ('Point', 0) 1043 >>> get_geometry_type_srid('geometry[0, Point]') 1044 ('Point', 0) 1045 >>> get_geometry_type_srid('geometry[0]') 1046 ('geometry', 0) 1047 >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]') 1048 ('MultiLineString', 4326) 1049 >>> get_geometry_type_srid('geography') 1050 ('geometry', 0) 1051 >>> get_geometry_type_srid('geography[POINT]') 1052 ('Point', 0) 1053 >>> get_geometry_type_srid('geometry[POINT, ESRI:102003]') 1054 ('Point', 'ESRI:102003') 1055 """ 1056 from meerschaum.utils.misc import is_int 1057 ### NOTE: PostGIS syntax must also be parsed. 1058 dtype = dtype.replace('(', '[').replace(')', ']') 1059 bare_dtype = dtype.split('[', maxsplit=1)[0] 1060 modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']') 1061 if not modifier: 1062 return default_type, default_srid 1063 1064 parts = [ 1065 part.split('=')[-1].strip() 1066 for part in modifier.split(',') 1067 ] 1068 parts_casted = [ 1069 ( 1070 int(part) 1071 if is_int(part) 1072 else part 1073 ) 1074 for part in parts 1075 ] 1076 1077 srid = default_srid 1078 geometry_type = default_type 1079 1080 for part in parts_casted: 1081 if isinstance(part, int) or ':' in str(part): 1082 srid = part 1083 break 1084 1085 for part in parts_casted: 1086 if isinstance(part, str) and part != srid: 1087 geometry_type = part 1088 break 1089 1090 return geometry_type, srid 1091 1092 1093def get_current_timestamp( 1094 precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'], 1095 precision_interval: int = 1, 1096 round_to: str = 'down', 1097 as_pandas: bool = False, 1098 as_int: bool = False, 1099 _now: Union[datetime, int, None] = None, 1100) -> 'Union[datetime, pd.Timestamp, int]': 1101 """ 1102 Return the current UTC timestamp to nanosecond precision. 1103 1104 Parameters 1105 ---------- 1106 precision_unit: str, default 'us' 1107 The precision of the timestamp to be returned. 1108 Valid values are the following: 1109 - `ns` / `nanosecond` 1110 - `us` / `microsecond` 1111 - `ms` / `millisecond` 1112 - `s` / `sec` / `second` 1113 - `m` / `min` / `minute` 1114 - `h` / `hr` / `hour` 1115 - `d` / `day` 1116 1117 precision_interval: int, default 1 1118 Round the timestamp to the `precision_interval` units. 1119 For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals. 1120 Note: `precision_interval` must be 1 when `precision='nanosecond'`. 1121 1122 round_to: str, default 'down' 1123 The direction to which to round the timestamp. 1124 Available options are `down`, `up`, and `closest`. 1125 1126 as_pandas: bool, default False 1127 If `True`, return a Pandas Timestamp. 1128 This is always true if `unit` is `nanosecond`. 1129 1130 as_int: bool, default False 1131 If `True`, return the timestamp to an integer. 1132 Overrides `as_pandas`. 1133 1134 Returns 1135 ------- 1136 A Pandas Timestamp, datetime object, or integer with precision to the provided unit. 1137 1138 Examples 1139 -------- 1140 >>> get_current_timestamp('ns') 1141 Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC') 1142 >>> get_current_timestamp('ms') 1143 Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC') 1144 """ 1145 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1146 if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS: 1147 from meerschaum.utils.misc import items_str 1148 raise ValueError( 1149 f"Unknown precision unit '{precision_unit}'. " 1150 "Accepted values are " 1151 f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}." 1152 ) 1153 1154 if not as_int: 1155 as_pandas = as_pandas or true_precision_unit == 'nanosecond' 1156 pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None 1157 1158 if true_precision_unit == 'nanosecond': 1159 if precision_interval != 1: 1160 warn("`precision_interval` must be 1 for nanosecond precision.") 1161 now_ts = time.time_ns() if not isinstance(_now, int) else _now 1162 if as_int: 1163 return now_ts 1164 return pd.to_datetime(now_ts, unit='ns', utc=True) 1165 1166 now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now 1167 delta = timedelta(**{true_precision_unit + 's': precision_interval}) 1168 rounded_now = round_time(now, delta, to=round_to) 1169 1170 if as_int: 1171 return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit]) 1172 1173 ts_val = ( 1174 pd.to_datetime(rounded_now, utc=True) 1175 if as_pandas 1176 else rounded_now 1177 ) 1178 1179 if not as_pandas: 1180 return ts_val 1181 1182 as_unit_precisions = ('microsecond', 'millisecond', 'second') 1183 if true_precision_unit not in as_unit_precisions: 1184 return ts_val 1185 1186 return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit]) 1187 1188 1189def is_dtype_special(type_: str) -> bool: 1190 """ 1191 Return whether a dtype should be treated as a special Meerschaum dtype. 1192 This is not the same as a Meerschaum alias. 1193 """ 1194 true_type = MRSM_ALIAS_DTYPES.get(type_, type_) 1195 if true_type in ( 1196 'uuid', 1197 'json', 1198 'bytes', 1199 'numeric', 1200 'datetime', 1201 'geometry', 1202 'geography', 1203 'date', 1204 'bool', 1205 ): 1206 return True 1207 1208 if are_dtypes_equal(true_type, 'datetime'): 1209 return True 1210 1211 if are_dtypes_equal(true_type, 'date'): 1212 return True 1213 1214 if true_type.startswith('numeric'): 1215 return True 1216 1217 if true_type.startswith('bool'): 1218 return True 1219 1220 if true_type.startswith('geometry'): 1221 return True 1222 1223 if true_type.startswith('geography'): 1224 return True 1225 1226 return False 1227 1228 1229def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str: 1230 """ 1231 Get the next precision string in order of value. 1232 1233 Parameters 1234 ---------- 1235 precision_unit: str 1236 The precision string (`'nanosecond'`, `'ms'`, etc.). 1237 1238 decrease: bool, defaul True 1239 If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`). 1240 If `False`, return the precision unit which is higher. 1241 1242 Returns 1243 ------- 1244 A `precision` string which is lower or higher than the given precision unit. 1245 1246 Examples 1247 -------- 1248 >>> get_next_precision_unit('nanosecond') 1249 'microsecond' 1250 >>> get_next_precision_unit('ms') 1251 'second' 1252 >>> get_next_precision_unit('hour', decrease=False) 1253 'minute' 1254 """ 1255 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1256 precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None) 1257 if not precision_scalar: 1258 raise ValueError(f"Invalid precision unit '{precision_unit}'.") 1259 1260 precisions = sorted( 1261 list(MRSM_PRECISION_UNITS_SCALARS), 1262 key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p] 1263 ) 1264 1265 precision_index = precisions.index(true_precision_unit) 1266 new_precision_index = precision_index + (-1 if decrease else 1) 1267 if new_precision_index < 0 or new_precision_index >= len(precisions): 1268 raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.") 1269 1270 return precisions[new_precision_index] 1271 1272 1273def round_time( 1274 dt: Optional[datetime] = None, 1275 date_delta: Optional[timedelta] = None, 1276 to: 'str' = 'down' 1277) -> datetime: 1278 """ 1279 Round a datetime object to a multiple of a timedelta. 1280 1281 Parameters 1282 ---------- 1283 dt: Optional[datetime], default None 1284 If `None`, grab the current UTC datetime. 1285 1286 date_delta: Optional[timedelta], default None 1287 If `None`, use a delta of 1 minute. 1288 1289 to: 'str', default 'down' 1290 Available options are `'up'`, `'down'`, and `'closest'`. 1291 1292 Returns 1293 ------- 1294 A rounded `datetime` object. 1295 1296 Examples 1297 -------- 1298 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 1299 datetime.datetime(2022, 1, 1, 12, 15) 1300 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 1301 datetime.datetime(2022, 1, 1, 12, 16) 1302 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 1303 datetime.datetime(2022, 1, 1, 12, 0) 1304 >>> round_time( 1305 ... datetime(2022, 1, 1, 12, 15, 57, 200), 1306 ... timedelta(hours=1), 1307 ... to = 'closest' 1308 ... ) 1309 datetime.datetime(2022, 1, 1, 12, 0) 1310 >>> round_time( 1311 ... datetime(2022, 1, 1, 12, 45, 57, 200), 1312 ... datetime.timedelta(hours=1), 1313 ... to = 'closest' 1314 ... ) 1315 datetime.datetime(2022, 1, 1, 13, 0) 1316 1317 """ 1318 from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP 1319 if date_delta is None: 1320 date_delta = timedelta(minutes=1) 1321 1322 if dt is None: 1323 dt = datetime.now(timezone.utc).replace(tzinfo=None) 1324 1325 def get_total_microseconds(td: timedelta) -> int: 1326 return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds 1327 1328 round_to_microseconds = get_total_microseconds(date_delta) 1329 if round_to_microseconds == 0: 1330 return dt 1331 1332 dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min 1333 dt_total_microseconds = get_total_microseconds(dt_delta_from_min) 1334 1335 dt_dec = Decimal(dt_total_microseconds) 1336 round_to_dec = Decimal(round_to_microseconds) 1337 1338 div = dt_dec / round_to_dec 1339 if to == 'down': 1340 num_intervals = div.to_integral_value(rounding=ROUND_DOWN) 1341 elif to == 'up': 1342 num_intervals = div.to_integral_value(rounding=ROUND_UP) 1343 else: 1344 num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP) 1345 1346 rounded_dt_total_microseconds = num_intervals * round_to_dec 1347 adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds 1348 1349 return dt + timedelta(microseconds=adjustment_microseconds)
93def to_pandas_dtype(dtype: str) -> str: 94 """ 95 Cast a supported Meerschaum dtype to a Pandas dtype. 96 """ 97 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 98 if known_dtype is not None: 99 return known_dtype 100 101 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 102 if alias_dtype is not None: 103 return MRSM_PD_DTYPES[alias_dtype] 104 105 if dtype.startswith('numeric'): 106 return MRSM_PD_DTYPES['numeric'] 107 108 if dtype.startswith('geometry'): 109 return MRSM_PD_DTYPES['geometry'] 110 111 if dtype.startswith('geography'): 112 return MRSM_PD_DTYPES['geography'] 113 114 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 115 ### treat it as a SQL db type. 116 if dtype.split(' ')[0].isupper(): 117 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 118 return get_pd_type_from_db_type(dtype) 119 120 from meerschaum.utils.packages import attempt_import 121 _ = attempt_import('pyarrow', lazy=False) 122 pandas = attempt_import('pandas', lazy=False) 123 124 try: 125 return str(pandas.api.types.pandas_dtype(dtype)) 126 except Exception: 127 warn( 128 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 129 + f"{traceback.format_exc()}", 130 stack=False, 131 ) 132 return 'object'
Cast a supported Meerschaum dtype to a Pandas dtype.
135def are_dtypes_equal( 136 ldtype: Union[str, Dict[str, str]], 137 rdtype: Union[str, Dict[str, str]], 138) -> bool: 139 """ 140 Determine whether two dtype strings may be considered 141 equivalent to avoid unnecessary conversions. 142 143 Parameters 144 ---------- 145 ldtype: Union[str, Dict[str, str]] 146 The left dtype to compare. 147 May also provide a dtypes dictionary. 148 149 rdtype: Union[str, Dict[str, str]] 150 The right dtype to compare. 151 May also provide a dtypes dictionary. 152 153 Returns 154 ------- 155 A `bool` indicating whether the two dtypes are to be considered equivalent. 156 """ 157 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 158 lkeys = sorted([str(k) for k in ldtype.keys()]) 159 rkeys = sorted([str(k) for k in rdtype.keys()]) 160 for lkey, rkey in zip(lkeys, rkeys): 161 if lkey != rkey: 162 return False 163 ltype = ldtype[lkey] 164 rtype = rdtype[rkey] 165 if not are_dtypes_equal(ltype, rtype): 166 return False 167 return True 168 169 try: 170 if ldtype == rdtype: 171 return True 172 except Exception: 173 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 174 return False 175 176 ### Sometimes pandas dtype objects are passed. 177 ldtype = str(ldtype).split('[', maxsplit=1)[0] 178 rdtype = str(rdtype).split('[', maxsplit=1)[0] 179 180 if ldtype in MRSM_ALIAS_DTYPES: 181 ldtype = MRSM_ALIAS_DTYPES[ldtype] 182 183 if rdtype in MRSM_ALIAS_DTYPES: 184 rdtype = MRSM_ALIAS_DTYPES[rdtype] 185 186 json_dtypes = ('json', 'object') 187 if ldtype in json_dtypes and rdtype in json_dtypes: 188 return True 189 190 numeric_dtypes = ('numeric', 'decimal', 'object') 191 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 192 return True 193 194 uuid_dtypes = ('uuid', 'object') 195 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 196 return True 197 198 bytes_dtypes = ('bytes', 'object', 'binary') 199 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 200 return True 201 202 geometry_dtypes = ('geometry', 'object', 'geography') 203 if ldtype in geometry_dtypes and rdtype in geometry_dtypes: 204 return True 205 206 if ldtype.lower() == rdtype.lower(): 207 return True 208 209 datetime_dtypes = ('datetime', 'timestamp') 210 ldtype_found_dt_prefix = False 211 rdtype_found_dt_prefix = False 212 for dt_prefix in datetime_dtypes: 213 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 214 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 215 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 216 return True 217 218 string_dtypes = ('str', 'string', 'object') 219 if ldtype in string_dtypes and rdtype in string_dtypes: 220 return True 221 222 int_dtypes = ( 223 'int', 'int64', 'int32', 'int16', 'int8', 224 'uint', 'uint64', 'uint32', 'uint16', 'uint8', 225 ) 226 int_substrings = ('int',) 227 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 228 return True 229 for substring in int_substrings: 230 if substring in ldtype.lower() and substring in rdtype.lower(): 231 return True 232 233 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 234 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 235 return True 236 237 bool_dtypes = ('bool', 'boolean') 238 if ldtype in bool_dtypes and rdtype in bool_dtypes: 239 return True 240 241 date_dtypes = ( 242 'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]', 243 'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]', 244 ) 245 if ldtype in date_dtypes and rdtype in date_dtypes: 246 return True 247 248 return False
Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.
Parameters
- ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
- rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
- A
boolindicating whether the two dtypes are to be considered equivalent.
251def is_dtype_numeric(dtype: str) -> bool: 252 """ 253 Determine whether a given `dtype` string 254 should be considered compatible with the Meerschaum dtype `numeric`. 255 256 Parameters 257 ---------- 258 dtype: str 259 The pandas-like dtype string. 260 261 Returns 262 ------- 263 A bool indicating the dtype is compatible with `numeric`. 264 """ 265 dtype_lower = dtype.lower() 266 267 acceptable_substrings = ('numeric', 'float', 'double', 'int') 268 for substring in acceptable_substrings: 269 if substring in dtype_lower: 270 return True 271 272 return False
Determine whether a given dtype string
should be considered compatible with the Meerschaum dtype numeric.
Parameters
- dtype (str): The pandas-like dtype string.
Returns
- A bool indicating the dtype is compatible with
numeric.
275def attempt_cast_to_numeric( 276 value: Any, 277 quantize: bool = False, 278 precision: Optional[int] = None, 279 scale: Optional[int] = None, 280)-> Any: 281 """ 282 Given a value, attempt to coerce it into a numeric (Decimal). 283 284 Parameters 285 ---------- 286 value: Any 287 The value to be cast to a Decimal. 288 289 quantize: bool, default False 290 If `True`, quantize the decimal to the specified precision and scale. 291 292 precision: Optional[int], default None 293 If `quantize` is `True`, use this precision. 294 295 scale: Optional[int], default None 296 If `quantize` is `True`, use this scale. 297 298 Returns 299 ------- 300 A `Decimal` if possible, or `value`. 301 """ 302 if isinstance(value, Decimal): 303 if quantize and precision and scale: 304 return quantize_decimal(value, precision, scale) 305 return value 306 try: 307 if value_is_null(value): 308 return Decimal('NaN') 309 310 dec = Decimal(str(value)) 311 if not quantize or not precision or not scale: 312 return dec 313 return quantize_decimal(dec, precision, scale) 314 except Exception: 315 return value
Given a value, attempt to coerce it into a numeric (Decimal).
Parameters
- value (Any): The value to be cast to a Decimal.
- quantize (bool, default False):
If
True, quantize the decimal to the specified precision and scale. - precision (Optional[int], default None):
If
quantizeisTrue, use this precision. - scale (Optional[int], default None):
If
quantizeisTrue, use this scale.
Returns
- A
Decimalif possible, orvalue.
318def attempt_cast_to_uuid(value: Any) -> Any: 319 """ 320 Given a value, attempt to coerce it into a UUID (`uuid4`). 321 """ 322 if isinstance(value, uuid.UUID): 323 return value 324 try: 325 return ( 326 uuid.UUID(str(value)) 327 if not value_is_null(value) 328 else None 329 ) 330 except Exception: 331 return value
Given a value, attempt to coerce it into a UUID (uuid4).
334def attempt_cast_to_bytes(value: Any) -> Any: 335 """ 336 Given a value, attempt to coerce it into a bytestring. 337 """ 338 if isinstance(value, bytes): 339 return value 340 try: 341 return ( 342 deserialize_bytes_string(str(value)) 343 if not value_is_null(value) 344 else None 345 ) 346 except Exception: 347 return value
Given a value, attempt to coerce it into a bytestring.
350def attempt_cast_to_geometry(value: Any) -> Any: 351 """ 352 Given a value, attempt to coerce it into a `shapely` (`geometry`) object. 353 """ 354 typ = str(type(value)) 355 if 'pandas' in typ and 'Series' in typ: 356 if 'GeoSeries' in typ: 357 return value 358 359 gpd = mrsm.attempt_import('geopandas', lazy=False) 360 if len(value) == 0: 361 return gpd.GeoSeries([]) 362 363 ix = value.first_valid_index() 364 if ix is None: 365 try: 366 return gpd.GeoSeries(value) 367 except Exception: 368 traceback.print_exc() 369 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 370 371 sample_val = value[ix] 372 sample_typ = str(type(sample_val)) 373 if 'shapely' in sample_typ: 374 try: 375 return gpd.GeoSeries(value) 376 except Exception: 377 traceback.print_exc() 378 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 379 380 sample_is_gpkg = geometry_is_gpkg(sample_val) 381 if sample_is_gpkg: 382 try: 383 value = value.apply(lambda x: gpkg_wkb_to_wkb(x)[0]) 384 except Exception: 385 traceback.print_exc() 386 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 387 388 sample_is_wkt = geometry_is_wkt(sample_val) if not sample_is_gpkg else False 389 try: 390 return ( 391 gpd.GeoSeries.from_wkt(value) 392 if sample_is_wkt 393 else gpd.GeoSeries.from_wkb(value) 394 ) 395 except Exception: 396 traceback.print_exc() 397 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 398 399 if 'shapely' in typ: 400 return value 401 402 shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import( 403 'shapely', 404 'shapely.wkt', 405 'shapely.wkb', 406 lazy=False, 407 ) 408 409 if isinstance(value, (dict, list)): 410 try: 411 return shapely.from_geojson(json.dumps(value)) 412 except Exception: 413 return value 414 415 value_is_gpkg = geometry_is_gpkg(value) 416 if value_is_gpkg: 417 try: 418 wkb_data, _, _ = gpkg_wkb_to_wkb(value) 419 return shapely_wkb.loads(wkb_data) 420 except Exception: 421 return value 422 423 value_is_wkt = geometry_is_wkt(value) 424 if value_is_wkt is None: 425 return value 426 427 try: 428 return ( 429 shapely_wkt.loads(value) 430 if value_is_wkt 431 else shapely_wkb.loads(value) 432 ) 433 except Exception: 434 pass 435 436 return value
Given a value, attempt to coerce it into a shapely (geometry) object.
439def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]: 440 """ 441 Determine whether an input value should be treated as WKT or WKB geometry data. 442 443 Parameters 444 ---------- 445 value: Union[str, bytes] 446 The input data to be parsed into geometry data. 447 448 Returns 449 ------- 450 A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB). 451 Return `None` if `value` should be parsed as neither. 452 """ 453 import re 454 if not isinstance(value, (str, bytes)): 455 return None 456 457 if isinstance(value, bytes): 458 return False 459 460 wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$' 461 if re.match(wkt_pattern, value, re.IGNORECASE): 462 return True 463 464 if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0: 465 return False 466 467 return None
Determine whether an input value should be treated as WKT or WKB geometry data.
Parameters
- value (Union[str, bytes]): The input data to be parsed into geometry data.
Returns
- A
bool(Trueifvalueis WKT andFalseif it should be treated as WKB). - Return
Noneifvalueshould be parsed as neither.
470def geometry_is_gpkg(value: bytes) -> bool: 471 """ 472 Return whether the input `value` is formatted as GeoPackage WKB. 473 """ 474 if not isinstance(value, bytes) or len(value) < 2: 475 return False 476 477 return value[0:2] == b'GP'
Return whether the input value is formatted as GeoPackage WKB.
479def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]: 480 """ 481 Converts GeoPackage WKB to standard WKB by removing the header. 482 483 Parameters 484 ---------- 485 gpkg_wkb_bytes: bytes 486 The GeoPackage WKB byte string. 487 488 Returns 489 ------- 490 A tuple containing the standard WKB bytes, SRID, and flags. 491 """ 492 magic_number = gpkg_wkb_bytes[0:2] 493 if magic_number != b'GP': 494 raise ValueError("Invalid GeoPackage WKB header: missing magic number.") 495 496 try: 497 header = gpkg_wkb_bytes[0:8] 498 header_vals = struct.unpack('<ccBBi', header) 499 flags = header_vals[-2] 500 srid = header_vals[-1] 501 except struct.error: 502 header = gpkg_wkb_bytes[0:6] 503 header_vals = struct.unpack('<ccBBh', header) 504 flags = header_vals[-2] 505 srid = header_vals[-1] 506 507 envelope_type = (flags >> 1) & 0x07 508 envelope_sizes = { 509 0: 0, 510 1: 32, 511 2: 48, 512 3: 48, 513 4: 64, 514 } 515 header_length = 8 + envelope_sizes.get(envelope_type, 0) 516 standard_wkb_bytes = gpkg_wkb_bytes[header_length:] 517 return standard_wkb_bytes, srid, flags
Converts GeoPackage WKB to standard WKB by removing the header.
Parameters
- gpkg_wkb_bytes (bytes): The GeoPackage WKB byte string.
Returns
- A tuple containing the standard WKB bytes, SRID, and flags.
520def value_is_null(value: Any) -> bool: 521 """ 522 Determine if a value is a null-like string. 523 """ 524 return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')
Determine if a value is a null-like string.
527def none_if_null(value: Any) -> Any: 528 """ 529 Return `None` if a value is a null-like string. 530 """ 531 return (None if value_is_null(value) else value)
Return None if a value is a null-like string.
534def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 535 """ 536 Quantize a given `Decimal` to a known scale and precision. 537 538 Parameters 539 ---------- 540 x: Decimal 541 The `Decimal` to be quantized. 542 543 precision: int 544 The total number of significant digits. 545 546 scale: int 547 The number of significant digits after the decimal point. 548 549 Returns 550 ------- 551 A `Decimal` quantized to the specified scale and precision. 552 """ 553 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 554 try: 555 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 556 except InvalidOperation: 557 pass 558 559 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
Quantize a given Decimal to a known scale and precision.
Parameters
- x (Decimal):
The
Decimalto be quantized. - precision (int): The total number of significant digits.
- scale (int): The number of significant digits after the decimal point.
Returns
- A
Decimalquantized to the specified scale and precision.
562def serialize_decimal( 563 x: Any, 564 quantize: bool = False, 565 precision: Optional[int] = None, 566 scale: Optional[int] = None, 567) -> Any: 568 """ 569 Return a quantized string of an input decimal. 570 571 Parameters 572 ---------- 573 x: Any 574 The potential decimal to be serialized. 575 576 quantize: bool, default False 577 If `True`, quantize the incoming Decimal to the specified scale and precision 578 before serialization. 579 580 precision: Optional[int], default None 581 The precision of the decimal to be quantized. 582 583 scale: Optional[int], default None 584 The scale of the decimal to be quantized. 585 586 Returns 587 ------- 588 A string of the input decimal or the input if not a Decimal. 589 """ 590 if not isinstance(x, Decimal): 591 return x 592 593 if value_is_null(x): 594 return None 595 596 if quantize and scale and precision: 597 x = quantize_decimal(x, precision, scale) 598 599 return f"{x:f}"
Return a quantized string of an input decimal.
Parameters
- x (Any): The potential decimal to be serialized.
- quantize (bool, default False):
If
True, quantize the incoming Decimal to the specified scale and precision before serialization. - precision (Optional[int], default None): The precision of the decimal to be quantized.
- scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
- A string of the input decimal or the input if not a Decimal.
602def coerce_timezone( 603 dt: Any, 604 strip_utc: bool = False, 605) -> Any: 606 """ 607 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 608 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 609 """ 610 if dt is None: 611 return None 612 613 if isinstance(dt, int): 614 return dt 615 616 if isinstance(dt, str): 617 dateutil_parser = mrsm.attempt_import('dateutil.parser') 618 try: 619 dt = dateutil_parser.parse(dt) 620 except Exception: 621 return dt 622 623 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 624 if dt_is_series: 625 pandas = mrsm.attempt_import('pandas', lazy=False) 626 627 if ( 628 pandas.api.types.is_datetime64_any_dtype(dt) and ( 629 (dt.dt.tz is not None and not strip_utc) 630 or 631 (dt.dt.tz is None and strip_utc) 632 ) 633 ): 634 return dt 635 636 dt_series = to_datetime(dt, coerce_utc=False) 637 if dt_series.dt.tz is None: 638 dt_series = dt_series.dt.tz_localize(timezone.utc) 639 if strip_utc: 640 try: 641 if dt_series.dt.tz is not None: 642 dt_series = dt_series.dt.tz_localize(None) 643 except Exception: 644 pass 645 646 return dt_series 647 648 if dt.tzinfo is None: 649 if strip_utc: 650 return dt 651 return dt.replace(tzinfo=timezone.utc) 652 653 utc_dt = dt.astimezone(timezone.utc) 654 if strip_utc: 655 return utc_dt.replace(tzinfo=None) 656 return utc_dt
Given a datetime, pandas Timestamp or Series of Timestamp,
return a UTC timestamp (strip timezone if strip_utc is True.
659def to_datetime( 660 dt_val: Any, 661 as_pydatetime: bool = False, 662 coerce_utc: bool = True, 663 precision_unit: Optional[str] = None, 664) -> Any: 665 """ 666 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 667 668 Parameters 669 ---------- 670 dt_val: Any 671 The value to coerce to Pandas Timestamps. 672 673 as_pydatetime: bool, default False 674 If `True`, return a Python datetime object. 675 676 coerce_utc: bool, default True 677 If `True`, ensure the value has UTC tzinfo. 678 679 precision_unit: Optional[str], default None 680 If provided, enforce the provided precision unit. 681 """ 682 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 683 is_dask = 'dask' in getattr(dt_val, '__module__', '') 684 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 685 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 686 pd = pandas if dd is None else dd 687 enforce_precision = precision_unit is not None 688 precision_unit = precision_unit or 'microsecond' 689 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 690 precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None) 691 if not precision_abbreviation: 692 raise ValueError(f"Invalid precision '{precision_unit}'.") 693 694 def parse(x: Any) -> Any: 695 try: 696 return dateutil_parser.parse(x) 697 except Exception: 698 return x 699 700 def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool: 701 dtype_check_against = ( 702 f"datetime64[{precision_abbreviation}, UTC]" 703 if with_utc 704 else f"datetime64[{precision_abbreviation}]" 705 ) 706 return ( 707 dtype_to_check == dtype_check_against 708 if enforce_precision 709 else ( 710 dtype_to_check.startswith('datetime64[') 711 and ( 712 ('utc' in dtype_to_check.lower()) 713 if with_utc 714 else ('utc' not in dtype_to_check.lower()) 715 ) 716 ) 717 ) 718 719 if isinstance(dt_val, pd.Timestamp): 720 dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime() 721 return ( 722 coerce_timezone(dt_val_to_return) 723 if coerce_utc 724 else dt_val_to_return 725 ) 726 727 if dt_is_series: 728 changed_tz = False 729 original_tz = None 730 dtype = str(getattr(dt_val, 'dtype', 'object')) 731 if ( 732 are_dtypes_equal(dtype, 'datetime') 733 and 'utc' not in dtype.lower() 734 and hasattr(dt_val, 'dt') 735 ): 736 original_tz = dt_val.dt.tz 737 dt_val = dt_val.dt.tz_localize(timezone.utc) 738 changed_tz = True 739 dtype = str(getattr(dt_val, 'dtype', 'object')) 740 try: 741 new_dt_series = ( 742 dt_val 743 if check_dtype(dtype, with_utc=True) 744 else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]") 745 ) 746 except pd.errors.OutOfBoundsDatetime: 747 try: 748 next_precision = get_next_precision_unit(true_precision_unit) 749 next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision] 750 new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]") 751 except Exception: 752 new_dt_series = None 753 except ValueError: 754 new_dt_series = None 755 except TypeError: 756 try: 757 new_dt_series = ( 758 new_dt_series 759 if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False) 760 else dt_val.astype(f"datetime64[{precision_abbreviation}]") 761 ) 762 except Exception: 763 new_dt_series = None 764 765 if new_dt_series is None: 766 new_dt_series = dt_val.apply(lambda x: parse(str(x))) 767 768 if coerce_utc: 769 return coerce_timezone(new_dt_series) 770 771 if changed_tz: 772 new_dt_series = new_dt_series.dt.tz_localize(original_tz) 773 return new_dt_series 774 775 try: 776 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 777 if new_dt_val.unit != precision_abbreviation: 778 new_dt_val = new_dt_val.as_unit(precision_abbreviation) 779 if as_pydatetime: 780 return new_dt_val.to_pydatetime() 781 return new_dt_val 782 except (pd.errors.OutOfBoundsDatetime, ValueError): 783 pass 784 785 new_dt_val = parse(dt_val) 786 if not coerce_utc: 787 return new_dt_val 788 return coerce_timezone(new_dt_val)
Wrap pd.to_datetime() and add support for out-of-bounds values.
Parameters
- dt_val (Any): The value to coerce to Pandas Timestamps.
- as_pydatetime (bool, default False):
If
True, return a Python datetime object. - coerce_utc (bool, default True):
If
True, ensure the value has UTC tzinfo. - precision_unit (Optional[str], default None): If provided, enforce the provided precision unit.
791def serialize_bytes(data: bytes) -> str: 792 """ 793 Return the given bytes as a base64-encoded string. 794 """ 795 import base64 796 if not isinstance(data, bytes) and value_is_null(data): 797 return data 798 return base64.b64encode(data).decode('utf-8')
Return the given bytes as a base64-encoded string.
801def serialize_geometry( 802 geom: Any, 803 geometry_format: str = 'wkb_hex', 804 srid: Optional[int] = None, 805) -> Union[str, Dict[str, Any], bytes, None]: 806 """ 807 Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 808 809 Parameters 810 ---------- 811 geom: Any 812 The potential geometry data to be serialized. 813 814 geometry_format: str, default 'wkb_hex' 815 The serialization format for geometry data. 816 Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`. 817 818 srid: Optional[int], default None 819 If provided, use this as the source CRS when serializing to GeoJSON. 820 821 Returns 822 ------- 823 A string containing the geometry data, or bytes, or a dictionary, or None. 824 """ 825 if value_is_null(geom): 826 return None 827 828 shapely, shapely_ops, pyproj, np = mrsm.attempt_import( 829 'shapely', 'shapely.ops', 'pyproj', 'numpy', 830 lazy=False, 831 ) 832 if geometry_format == 'geojson': 833 if srid: 834 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True) 835 geom = shapely_ops.transform(transformer.transform, geom) 836 geojson_str = shapely.to_geojson(geom) 837 return json.loads(geojson_str) 838 839 if not hasattr(geom, 'wkb_hex'): 840 return str(geom) 841 842 byte_order = 1 if np.little_endian else 0 843 844 if geometry_format.startswith("wkb"): 845 return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True) 846 elif geometry_format == 'gpkg_wkb': 847 wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order) 848 flags = ( 849 ((byte_order & 0x01) | (0x20)) 850 if geom.is_empty 851 else (byte_order & 0x01) 852 ) 853 srid_val = srid or -1 854 header = struct.pack( 855 '<ccBBi', 856 b'G', b'P', 857 0, 858 flags, 859 srid_val 860 ) 861 return header + wkb_data 862 863 return shapely.to_wkt(geom)
Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON.
Parameters
- geom (Any): The potential geometry data to be serialized.
- geometry_format (str, default 'wkb_hex'):
The serialization format for geometry data.
Accepted formats are
wkb,wkb_hex,wkt,geojson, andgpkg_wkb. - srid (Optional[int], default None): If provided, use this as the source CRS when serializing to GeoJSON.
Returns
- A string containing the geometry data, or bytes, or a dictionary, or None.
866def deserialize_geometry(geom_wkb: Union[str, bytes]): 867 """ 868 Deserialize a WKB string into a shapely geometry object. 869 """ 870 shapely = mrsm.attempt_import('shapely', lazy=False) 871 return shapely.wkb.loads(geom_wkb)
Deserialize a WKB string into a shapely geometry object.
874def project_geometry(geom, srid: int, to_srid: int = 4326): 875 """ 876 Project a shapely geometry object to a new CRS (SRID). 877 """ 878 pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False) 879 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True) 880 return shapely_ops.transform(transformer.transform, geom)
Project a shapely geometry object to a new CRS (SRID).
883def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]: 884 """ 885 Given a serialized ASCII string of bytes data, return the original bytes. 886 The input data may either be base64- or hex-encoded. 887 888 Parameters 889 ---------- 890 data: Optional[str] 891 The string to be deserialized into bytes. 892 May be base64- or hex-encoded (prefixed with `'\\x'`). 893 894 force_hex: bool = False 895 If `True`, treat the input string as hex-encoded. 896 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 897 This will still strip the leading `'\\x'` prefix if present. 898 899 Returns 900 ------- 901 The original bytes used to produce the encoded string `data`. 902 """ 903 if not isinstance(data, str) and value_is_null(data): 904 return data 905 906 import binascii 907 import base64 908 909 is_hex = force_hex or data.startswith('\\x') 910 911 if is_hex: 912 if data.startswith('\\x'): 913 data = data[2:] 914 return binascii.unhexlify(data) 915 916 return base64.b64decode(data)
Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.
Parameters
- data (Optional[str]):
The string to be deserialized into bytes.
May be base64- or hex-encoded (prefixed with
'\x'). - force_hex (bool = False):
If
True, treat the input string as hex-encoded. Ifdatadoes not begin with the prefix'\x', setforce_hextoTrue. This will still strip the leading'\x'prefix if present.
Returns
- The original bytes used to produce the encoded string
data.
919def deserialize_base64(data: str) -> bytes: 920 """ 921 Return the original bytestring from the given base64-encoded string. 922 """ 923 import base64 924 return base64.b64decode(data)
Return the original bytestring from the given base64-encoded string.
927def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]: 928 """ 929 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 930 """ 931 import binascii 932 if not isinstance(data, bytes) and value_is_null(data): 933 return data 934 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
Return the given bytes as a hex string for PostgreSQL's BYTEA type.
937def serialize_datetime(dt: datetime) -> Union[str, None]: 938 """ 939 Serialize a datetime object into JSON (ISO format string). 940 941 Examples 942 -------- 943 >>> import json 944 >>> from datetime import datetime 945 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 946 '{"a": "2022-01-01T00:00:00Z"}' 947 948 """ 949 if not hasattr(dt, 'isoformat'): 950 return None 951 952 tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else '' 953 return dt.isoformat() + tz_suffix
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
956def serialize_date(d: date) -> Union[str, None]: 957 """ 958 Serialize a date object into its ISO representation. 959 """ 960 return d.isoformat() if hasattr(d, 'isoformat') else None
Serialize a date object into its ISO representation.
963def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]: 964 """ 965 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 966 967 Parameters 968 ---------- 969 x: Any 970 The value to serialize. 971 972 default_to_str: bool, default True 973 If `True`, return a string of `x` if x is not a designated type. 974 Otherwise return x. 975 976 Returns 977 ------- 978 A serialized version of x, or x. 979 """ 980 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 981 return x.meta 982 983 if hasattr(x, 'tzinfo'): 984 return serialize_datetime(x) 985 986 if hasattr(x, 'isoformat'): 987 return serialize_date(x) 988 989 if isinstance(x, bytes): 990 return serialize_bytes(x) 991 992 if isinstance(x, Decimal): 993 return serialize_decimal(x) 994 995 if 'shapely' in str(type(x)): 996 return serialize_geometry(x) 997 998 if value_is_null(x): 999 return None 1000 1001 if isinstance(x, (dict, list, tuple)): 1002 return json.dumps(x, default=json_serialize_value, separators=(',', ':')) 1003 1004 return str(x) if default_to_str else x
Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
Parameters
- x (Any): The value to serialize.
- default_to_str (bool, default True):
If
True, return a string ofxif x is not a designated type. Otherwise return x.
Returns
- A serialized version of x, or x.
1007def get_geometry_type_srid( 1008 dtype: str = 'geometry', 1009 default_type: str = 'geometry', 1010 default_srid: Union[int, str] = 0, 1011) -> Tuple[str, Union[int, str, None]]: 1012 """ 1013 Given the specified geometry `dtype`, return a tuple in the form (type, SRID). 1014 1015 Parameters 1016 ---------- 1017 dtype: Optional[str], default None 1018 Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`). 1019 You may specify a supported `shapely` geometry type and an SRID in the dtype modifier: 1020 1021 - `Point` 1022 - `LineString` 1023 - `LinearRing` 1024 - `Polygon` 1025 - `MultiPoint` 1026 - `MultiLineString` 1027 - `MultiPolygon` 1028 - `GeometryCollection` 1029 1030 Returns 1031 ------- 1032 A tuple in the form (type, SRID). 1033 Defaults to `(default_type, default_srid)`. 1034 1035 Examples 1036 -------- 1037 >>> from meerschaum.utils.dtypes import get_geometry_type_srid 1038 >>> get_geometry_type_srid() 1039 ('geometry', 4326) 1040 >>> get_geometry_type_srid('geometry[]') 1041 ('geometry', 4326) 1042 >>> get_geometry_type_srid('geometry[Point, 0]') 1043 ('Point', 0) 1044 >>> get_geometry_type_srid('geometry[0, Point]') 1045 ('Point', 0) 1046 >>> get_geometry_type_srid('geometry[0]') 1047 ('geometry', 0) 1048 >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]') 1049 ('MultiLineString', 4326) 1050 >>> get_geometry_type_srid('geography') 1051 ('geometry', 0) 1052 >>> get_geometry_type_srid('geography[POINT]') 1053 ('Point', 0) 1054 >>> get_geometry_type_srid('geometry[POINT, ESRI:102003]') 1055 ('Point', 'ESRI:102003') 1056 """ 1057 from meerschaum.utils.misc import is_int 1058 ### NOTE: PostGIS syntax must also be parsed. 1059 dtype = dtype.replace('(', '[').replace(')', ']') 1060 bare_dtype = dtype.split('[', maxsplit=1)[0] 1061 modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']') 1062 if not modifier: 1063 return default_type, default_srid 1064 1065 parts = [ 1066 part.split('=')[-1].strip() 1067 for part in modifier.split(',') 1068 ] 1069 parts_casted = [ 1070 ( 1071 int(part) 1072 if is_int(part) 1073 else part 1074 ) 1075 for part in parts 1076 ] 1077 1078 srid = default_srid 1079 geometry_type = default_type 1080 1081 for part in parts_casted: 1082 if isinstance(part, int) or ':' in str(part): 1083 srid = part 1084 break 1085 1086 for part in parts_casted: 1087 if isinstance(part, str) and part != srid: 1088 geometry_type = part 1089 break 1090 1091 return geometry_type, srid
Given the specified geometry dtype, return a tuple in the form (type, SRID).
Parameters
dtype (Optional[str], default None): Optionally provide a specific
geometrysyntax (e.g.geometry[MultiLineString, 4326]). You may specify a supportedshapelygeometry type and an SRID in the dtype modifier:PointLineStringLinearRingPolygonMultiPointMultiLineStringMultiPolygonGeometryCollection
Returns
- A tuple in the form (type, SRID).
- Defaults to
(default_type, default_srid).
Examples
>>> from meerschaum.utils.dtypes import get_geometry_type_srid
>>> get_geometry_type_srid()
('geometry', 4326)
>>> get_geometry_type_srid('geometry[]')
('geometry', 4326)
>>> get_geometry_type_srid('geometry[Point, 0]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0, Point]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0]')
('geometry', 0)
>>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
('MultiLineString', 4326)
>>> get_geometry_type_srid('geography')
('geometry', 0)
>>> get_geometry_type_srid('geography[POINT]')
('Point', 0)
>>> get_geometry_type_srid('geometry[POINT, ESRI:102003]')
('Point', 'ESRI:102003')
1094def get_current_timestamp( 1095 precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'], 1096 precision_interval: int = 1, 1097 round_to: str = 'down', 1098 as_pandas: bool = False, 1099 as_int: bool = False, 1100 _now: Union[datetime, int, None] = None, 1101) -> 'Union[datetime, pd.Timestamp, int]': 1102 """ 1103 Return the current UTC timestamp to nanosecond precision. 1104 1105 Parameters 1106 ---------- 1107 precision_unit: str, default 'us' 1108 The precision of the timestamp to be returned. 1109 Valid values are the following: 1110 - `ns` / `nanosecond` 1111 - `us` / `microsecond` 1112 - `ms` / `millisecond` 1113 - `s` / `sec` / `second` 1114 - `m` / `min` / `minute` 1115 - `h` / `hr` / `hour` 1116 - `d` / `day` 1117 1118 precision_interval: int, default 1 1119 Round the timestamp to the `precision_interval` units. 1120 For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals. 1121 Note: `precision_interval` must be 1 when `precision='nanosecond'`. 1122 1123 round_to: str, default 'down' 1124 The direction to which to round the timestamp. 1125 Available options are `down`, `up`, and `closest`. 1126 1127 as_pandas: bool, default False 1128 If `True`, return a Pandas Timestamp. 1129 This is always true if `unit` is `nanosecond`. 1130 1131 as_int: bool, default False 1132 If `True`, return the timestamp to an integer. 1133 Overrides `as_pandas`. 1134 1135 Returns 1136 ------- 1137 A Pandas Timestamp, datetime object, or integer with precision to the provided unit. 1138 1139 Examples 1140 -------- 1141 >>> get_current_timestamp('ns') 1142 Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC') 1143 >>> get_current_timestamp('ms') 1144 Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC') 1145 """ 1146 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1147 if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS: 1148 from meerschaum.utils.misc import items_str 1149 raise ValueError( 1150 f"Unknown precision unit '{precision_unit}'. " 1151 "Accepted values are " 1152 f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}." 1153 ) 1154 1155 if not as_int: 1156 as_pandas = as_pandas or true_precision_unit == 'nanosecond' 1157 pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None 1158 1159 if true_precision_unit == 'nanosecond': 1160 if precision_interval != 1: 1161 warn("`precision_interval` must be 1 for nanosecond precision.") 1162 now_ts = time.time_ns() if not isinstance(_now, int) else _now 1163 if as_int: 1164 return now_ts 1165 return pd.to_datetime(now_ts, unit='ns', utc=True) 1166 1167 now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now 1168 delta = timedelta(**{true_precision_unit + 's': precision_interval}) 1169 rounded_now = round_time(now, delta, to=round_to) 1170 1171 if as_int: 1172 return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit]) 1173 1174 ts_val = ( 1175 pd.to_datetime(rounded_now, utc=True) 1176 if as_pandas 1177 else rounded_now 1178 ) 1179 1180 if not as_pandas: 1181 return ts_val 1182 1183 as_unit_precisions = ('microsecond', 'millisecond', 'second') 1184 if true_precision_unit not in as_unit_precisions: 1185 return ts_val 1186 1187 return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])
Return the current UTC timestamp to nanosecond precision.
Parameters
- precision_unit (str, default 'us'):
The precision of the timestamp to be returned.
Valid values are the following:
-
ns/nanosecond-us/microsecond-ms/millisecond-s/sec/second-m/min/minute-h/hr/hour-d/day - precision_interval (int, default 1):
Round the timestamp to the
precision_intervalunits. For example,precision='minute'andprecision_interval=15will round to 15-minute intervals. Note:precision_intervalmust be 1 whenprecision='nanosecond'. - round_to (str, default 'down'):
The direction to which to round the timestamp.
Available options are
down,up, andclosest. - as_pandas (bool, default False):
If
True, return a Pandas Timestamp. This is always true ifunitisnanosecond. - as_int (bool, default False):
If
True, return the timestamp to an integer. Overridesas_pandas.
Returns
- A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
Examples
>>> get_current_timestamp('ns')
Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
>>> get_current_timestamp('ms')
Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1190def is_dtype_special(type_: str) -> bool: 1191 """ 1192 Return whether a dtype should be treated as a special Meerschaum dtype. 1193 This is not the same as a Meerschaum alias. 1194 """ 1195 true_type = MRSM_ALIAS_DTYPES.get(type_, type_) 1196 if true_type in ( 1197 'uuid', 1198 'json', 1199 'bytes', 1200 'numeric', 1201 'datetime', 1202 'geometry', 1203 'geography', 1204 'date', 1205 'bool', 1206 ): 1207 return True 1208 1209 if are_dtypes_equal(true_type, 'datetime'): 1210 return True 1211 1212 if are_dtypes_equal(true_type, 'date'): 1213 return True 1214 1215 if true_type.startswith('numeric'): 1216 return True 1217 1218 if true_type.startswith('bool'): 1219 return True 1220 1221 if true_type.startswith('geometry'): 1222 return True 1223 1224 if true_type.startswith('geography'): 1225 return True 1226 1227 return False
Return whether a dtype should be treated as a special Meerschaum dtype. This is not the same as a Meerschaum alias.
1230def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str: 1231 """ 1232 Get the next precision string in order of value. 1233 1234 Parameters 1235 ---------- 1236 precision_unit: str 1237 The precision string (`'nanosecond'`, `'ms'`, etc.). 1238 1239 decrease: bool, defaul True 1240 If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`). 1241 If `False`, return the precision unit which is higher. 1242 1243 Returns 1244 ------- 1245 A `precision` string which is lower or higher than the given precision unit. 1246 1247 Examples 1248 -------- 1249 >>> get_next_precision_unit('nanosecond') 1250 'microsecond' 1251 >>> get_next_precision_unit('ms') 1252 'second' 1253 >>> get_next_precision_unit('hour', decrease=False) 1254 'minute' 1255 """ 1256 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1257 precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None) 1258 if not precision_scalar: 1259 raise ValueError(f"Invalid precision unit '{precision_unit}'.") 1260 1261 precisions = sorted( 1262 list(MRSM_PRECISION_UNITS_SCALARS), 1263 key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p] 1264 ) 1265 1266 precision_index = precisions.index(true_precision_unit) 1267 new_precision_index = precision_index + (-1 if decrease else 1) 1268 if new_precision_index < 0 or new_precision_index >= len(precisions): 1269 raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.") 1270 1271 return precisions[new_precision_index]
Get the next precision string in order of value.
Parameters
- precision_unit (str):
The precision string (
'nanosecond','ms', etc.). - decrease (bool, defaul True):
If
Truereturn the precision unit which is lower (e.g.nanosecond->millisecond). IfFalse, return the precision unit which is higher.
Returns
- A
precisionstring which is lower or higher than the given precision unit.
Examples
>>> get_next_precision_unit('nanosecond')
'microsecond'
>>> get_next_precision_unit('ms')
'second'
>>> get_next_precision_unit('hour', decrease=False)
'minute'
1274def round_time( 1275 dt: Optional[datetime] = None, 1276 date_delta: Optional[timedelta] = None, 1277 to: 'str' = 'down' 1278) -> datetime: 1279 """ 1280 Round a datetime object to a multiple of a timedelta. 1281 1282 Parameters 1283 ---------- 1284 dt: Optional[datetime], default None 1285 If `None`, grab the current UTC datetime. 1286 1287 date_delta: Optional[timedelta], default None 1288 If `None`, use a delta of 1 minute. 1289 1290 to: 'str', default 'down' 1291 Available options are `'up'`, `'down'`, and `'closest'`. 1292 1293 Returns 1294 ------- 1295 A rounded `datetime` object. 1296 1297 Examples 1298 -------- 1299 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 1300 datetime.datetime(2022, 1, 1, 12, 15) 1301 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 1302 datetime.datetime(2022, 1, 1, 12, 16) 1303 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 1304 datetime.datetime(2022, 1, 1, 12, 0) 1305 >>> round_time( 1306 ... datetime(2022, 1, 1, 12, 15, 57, 200), 1307 ... timedelta(hours=1), 1308 ... to = 'closest' 1309 ... ) 1310 datetime.datetime(2022, 1, 1, 12, 0) 1311 >>> round_time( 1312 ... datetime(2022, 1, 1, 12, 45, 57, 200), 1313 ... datetime.timedelta(hours=1), 1314 ... to = 'closest' 1315 ... ) 1316 datetime.datetime(2022, 1, 1, 13, 0) 1317 1318 """ 1319 from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP 1320 if date_delta is None: 1321 date_delta = timedelta(minutes=1) 1322 1323 if dt is None: 1324 dt = datetime.now(timezone.utc).replace(tzinfo=None) 1325 1326 def get_total_microseconds(td: timedelta) -> int: 1327 return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds 1328 1329 round_to_microseconds = get_total_microseconds(date_delta) 1330 if round_to_microseconds == 0: 1331 return dt 1332 1333 dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min 1334 dt_total_microseconds = get_total_microseconds(dt_delta_from_min) 1335 1336 dt_dec = Decimal(dt_total_microseconds) 1337 round_to_dec = Decimal(round_to_microseconds) 1338 1339 div = dt_dec / round_to_dec 1340 if to == 'down': 1341 num_intervals = div.to_integral_value(rounding=ROUND_DOWN) 1342 elif to == 'up': 1343 num_intervals = div.to_integral_value(rounding=ROUND_UP) 1344 else: 1345 num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP) 1346 1347 rounded_dt_total_microseconds = num_intervals * round_to_dec 1348 adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds 1349 1350 return dt + timedelta(microseconds=adjustment_microseconds)
Round a datetime object to a multiple of a timedelta.
Parameters
- dt (Optional[datetime], default None):
If
None, grab the current UTC datetime. - date_delta (Optional[timedelta], default None):
If
None, use a delta of 1 minute. - to ('str', default 'down'):
Available options are
'up','down', and'closest'.
Returns
- A rounded
datetimeobject.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 15, 57, 200),
... timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 45, 57, 200),
... datetime.timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)