meerschaum.utils.dtypes
Utility functions for working with data types.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with data types. 7""" 8 9import traceback 10import json 11import uuid 12import time 13import struct 14from datetime import timezone, datetime, date, timedelta 15from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP 16 17import meerschaum as mrsm 18from meerschaum.utils.typing import Dict, Union, Any, Optional, Tuple 19from meerschaum.utils.warnings import warn 20from meerschaum._internal.static import STATIC_CONFIG as _STATIC_CONFIG 21 22MRSM_ALIAS_DTYPES: Dict[str, str] = { 23 'decimal': 'numeric', 24 'Decimal': 'numeric', 25 'number': 'numeric', 26 'jsonl': 'json', 27 'JSON': 'json', 28 'binary': 'bytes', 29 'blob': 'bytes', 30 'varbinary': 'bytes', 31 'bytea': 'bytes', 32 'guid': 'uuid', 33 'UUID': 'uuid', 34 'geom': 'geometry', 35 'geog': 'geography', 36 'boolean': 'bool', 37 'day': 'date', 38} 39MRSM_PD_DTYPES: Dict[Union[str, None], str] = { 40 'json': 'object', 41 'numeric': 'object', 42 'geometry': 'object', 43 'geography': 'object', 44 'uuid': 'object', 45 'date': 'date32[day][pyarrow]', 46 'datetime': 'datetime64[us, UTC]', 47 'bool': 'bool[pyarrow]', 48 'int': 'int64[pyarrow]', 49 'int8': 'int8[pyarrow]', 50 'int16': 'int16[pyarrow]', 51 'int32': 'int32[pyarrow]', 52 'int64': 'int64[pyarrow]', 53 'str': 'string', 54 'bytes': 'binary[pyarrow]', 55 None: 'object', 56} 57 58MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = { 59 'nanosecond': 1_000_000_000, 60 'microsecond': 1_000_000, 61 'millisecond': 1000, 62 'second': 1, 63 'minute': (1 / 60), 64 'hour': (1 / 3600), 65 'day': (1 / 86400), 66} 67 68MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = { 69 'ns': 'nanosecond', 70 'us': 'microsecond', 71 'ms': 'millisecond', 72 's': 'second', 73 'sec': 'second', 74 'm': 'minute', 75 'min': 'minute', 76 'h': 'hour', 77 'hr': 'hour', 78 'd': 'day', 79 'D': 'day', 80} 81MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = { 82 'nanosecond': 'ns', 83 'microsecond': 'us', 84 'millisecond': 'ms', 85 'second': 's', 86 'minute': 'min', 87 'hour': 'hr', 88 'day': 'D', 89} 90 91 92def to_pandas_dtype(dtype: str) -> str: 93 """ 94 Cast a supported Meerschaum dtype to a Pandas dtype. 95 """ 96 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 97 if known_dtype is not None: 98 return known_dtype 99 100 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 101 if alias_dtype is not None: 102 return MRSM_PD_DTYPES[alias_dtype] 103 104 if dtype.startswith('numeric'): 105 return MRSM_PD_DTYPES['numeric'] 106 107 if dtype.startswith('geometry'): 108 return MRSM_PD_DTYPES['geometry'] 109 110 if dtype.startswith('geography'): 111 return MRSM_PD_DTYPES['geography'] 112 113 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 114 ### treat it as a SQL db type. 115 if dtype.split(' ')[0].isupper(): 116 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 117 return get_pd_type_from_db_type(dtype) 118 119 from meerschaum.utils.packages import attempt_import 120 _ = attempt_import('pyarrow', lazy=False) 121 pandas = attempt_import('pandas', lazy=False) 122 123 try: 124 return str(pandas.api.types.pandas_dtype(dtype)) 125 except Exception: 126 warn( 127 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 128 + f"{traceback.format_exc()}", 129 stack=False, 130 ) 131 return 'object' 132 133 134def are_dtypes_equal( 135 ldtype: Union[str, Dict[str, str]], 136 rdtype: Union[str, Dict[str, str]], 137) -> bool: 138 """ 139 Determine whether two dtype strings may be considered 140 equivalent to avoid unnecessary conversions. 141 142 Parameters 143 ---------- 144 ldtype: Union[str, Dict[str, str]] 145 The left dtype to compare. 146 May also provide a dtypes dictionary. 147 148 rdtype: Union[str, Dict[str, str]] 149 The right dtype to compare. 150 May also provide a dtypes dictionary. 151 152 Returns 153 ------- 154 A `bool` indicating whether the two dtypes are to be considered equivalent. 155 """ 156 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 157 lkeys = sorted([str(k) for k in ldtype.keys()]) 158 rkeys = sorted([str(k) for k in rdtype.keys()]) 159 for lkey, rkey in zip(lkeys, rkeys): 160 if lkey != rkey: 161 return False 162 ltype = ldtype[lkey] 163 rtype = rdtype[rkey] 164 if not are_dtypes_equal(ltype, rtype): 165 return False 166 return True 167 168 try: 169 if ldtype == rdtype: 170 return True 171 except Exception: 172 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 173 return False 174 175 ### Sometimes pandas dtype objects are passed. 176 ldtype = str(ldtype).split('[', maxsplit=1)[0] 177 rdtype = str(rdtype).split('[', maxsplit=1)[0] 178 179 if ldtype in MRSM_ALIAS_DTYPES: 180 ldtype = MRSM_ALIAS_DTYPES[ldtype] 181 182 if rdtype in MRSM_ALIAS_DTYPES: 183 rdtype = MRSM_ALIAS_DTYPES[rdtype] 184 185 json_dtypes = ('json', 'object') 186 if ldtype in json_dtypes and rdtype in json_dtypes: 187 return True 188 189 numeric_dtypes = ('numeric', 'decimal', 'object') 190 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 191 return True 192 193 uuid_dtypes = ('uuid', 'object') 194 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 195 return True 196 197 bytes_dtypes = ('bytes', 'object', 'binary') 198 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 199 return True 200 201 geometry_dtypes = ('geometry', 'object', 'geography') 202 if ldtype in geometry_dtypes and rdtype in geometry_dtypes: 203 return True 204 205 if ldtype.lower() == rdtype.lower(): 206 return True 207 208 datetime_dtypes = ('datetime', 'timestamp') 209 ldtype_found_dt_prefix = False 210 rdtype_found_dt_prefix = False 211 for dt_prefix in datetime_dtypes: 212 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 213 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 214 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 215 return True 216 217 string_dtypes = ('str', 'string', 'object') 218 if ldtype in string_dtypes and rdtype in string_dtypes: 219 return True 220 221 int_dtypes = ( 222 'int', 'int64', 'int32', 'int16', 'int8', 223 'uint', 'uint64', 'uint32', 'uint16', 'uint8', 224 ) 225 int_substrings = ('int',) 226 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 227 return True 228 for substring in int_substrings: 229 if substring in ldtype.lower() and substring in rdtype.lower(): 230 return True 231 232 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 233 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 234 return True 235 236 bool_dtypes = ('bool', 'boolean') 237 if ldtype in bool_dtypes and rdtype in bool_dtypes: 238 return True 239 240 date_dtypes = ( 241 'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]', 242 'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]', 243 ) 244 if ldtype in date_dtypes and rdtype in date_dtypes: 245 return True 246 247 return False 248 249 250def is_dtype_numeric(dtype: str) -> bool: 251 """ 252 Determine whether a given `dtype` string 253 should be considered compatible with the Meerschaum dtype `numeric`. 254 255 Parameters 256 ---------- 257 dtype: str 258 The pandas-like dtype string. 259 260 Returns 261 ------- 262 A bool indicating the dtype is compatible with `numeric`. 263 """ 264 dtype_lower = dtype.lower() 265 266 acceptable_substrings = ('numeric', 'float', 'double', 'int') 267 for substring in acceptable_substrings: 268 if substring in dtype_lower: 269 return True 270 271 return False 272 273 274def attempt_cast_to_numeric( 275 value: Any, 276 quantize: bool = False, 277 precision: Optional[int] = None, 278 scale: Optional[int] = None, 279)-> Any: 280 """ 281 Given a value, attempt to coerce it into a numeric (Decimal). 282 283 Parameters 284 ---------- 285 value: Any 286 The value to be cast to a Decimal. 287 288 quantize: bool, default False 289 If `True`, quantize the decimal to the specified precision and scale. 290 291 precision: Optional[int], default None 292 If `quantize` is `True`, use this precision. 293 294 scale: Optional[int], default None 295 If `quantize` is `True`, use this scale. 296 297 Returns 298 ------- 299 A `Decimal` if possible, or `value`. 300 """ 301 if isinstance(value, Decimal): 302 if quantize and precision and scale: 303 return quantize_decimal(value, precision, scale) 304 return value 305 try: 306 if value_is_null(value): 307 return Decimal('NaN') 308 309 dec = Decimal(str(value)) 310 if not quantize or not precision or not scale: 311 return dec 312 return quantize_decimal(dec, precision, scale) 313 except Exception: 314 return value 315 316 317def attempt_cast_to_uuid(value: Any) -> Any: 318 """ 319 Given a value, attempt to coerce it into a UUID (`uuid4`). 320 """ 321 if isinstance(value, uuid.UUID): 322 return value 323 try: 324 return ( 325 uuid.UUID(str(value)) 326 if not value_is_null(value) 327 else None 328 ) 329 except Exception: 330 return value 331 332 333def attempt_cast_to_bytes(value: Any) -> Any: 334 """ 335 Given a value, attempt to coerce it into a bytestring. 336 """ 337 if isinstance(value, bytes): 338 return value 339 try: 340 return ( 341 deserialize_bytes_string(str(value)) 342 if not value_is_null(value) 343 else None 344 ) 345 except Exception: 346 return value 347 348 349def attempt_cast_to_geometry(value: Any) -> Any: 350 """ 351 Given a value, attempt to coerce it into a `shapely` (`geometry`) object. 352 """ 353 typ = str(type(value)) 354 if 'pandas' in typ and 'Series' in typ: 355 if 'GeoSeries' in typ: 356 return value 357 gpd = mrsm.attempt_import('geopandas') 358 if len(value) == 0: 359 return gpd.GeoSeries([]) 360 361 ix = value.first_valid_index() 362 if ix is None: 363 try: 364 return gpd.GeoSeries(value) 365 except Exception: 366 traceback.print_exc() 367 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 368 sample_val = value[ix] 369 370 sample_is_gpkg = geometry_is_gpkg(sample_val) 371 if sample_is_gpkg: 372 try: 373 value = value.apply(lambda x: gpkg_wkb_to_wkb(x)[0]) 374 except Exception: 375 traceback.print_exc() 376 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 377 378 sample_is_wkt = geometry_is_wkt(sample_val) if not sample_is_gpkg else False 379 try: 380 return ( 381 gpd.GeoSeries.from_wkt(value) 382 if sample_is_wkt 383 else gpd.GeoSeries.from_wkb(value) 384 ) 385 except Exception: 386 traceback.print_exc() 387 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 388 389 if 'shapely' in typ: 390 return value 391 392 shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import( 393 'shapely', 394 'shapely.wkt', 395 'shapely.wkb', 396 lazy=False, 397 ) 398 399 if isinstance(value, (dict, list)): 400 try: 401 return shapely.from_geojson(json.dumps(value)) 402 except Exception: 403 return value 404 405 value_is_gpkg = geometry_is_gpkg(value) 406 if value_is_gpkg: 407 try: 408 wkb_data, _, _ = gpkg_wkb_to_wkb(value) 409 return shapely_wkb.loads(wkb_data) 410 except Exception: 411 return value 412 413 value_is_wkt = geometry_is_wkt(value) 414 if value_is_wkt is None: 415 return value 416 417 try: 418 return ( 419 shapely_wkt.loads(value) 420 if value_is_wkt 421 else shapely_wkb.loads(value) 422 ) 423 except Exception: 424 pass 425 426 return value 427 428 429def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]: 430 """ 431 Determine whether an input value should be treated as WKT or WKB geometry data. 432 433 Parameters 434 ---------- 435 value: Union[str, bytes] 436 The input data to be parsed into geometry data. 437 438 Returns 439 ------- 440 A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB). 441 Return `None` if `value` should be parsed as neither. 442 """ 443 import re 444 if not isinstance(value, (str, bytes)): 445 return None 446 447 if isinstance(value, bytes): 448 return False 449 450 wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$' 451 if re.match(wkt_pattern, value, re.IGNORECASE): 452 return True 453 454 if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0: 455 return False 456 457 return None 458 459 460def geometry_is_gpkg(value: bytes) -> bool: 461 """ 462 Return whether the input `value` is formatted as GeoPackage WKB. 463 """ 464 if not isinstance(value, bytes) or len(value) < 2: 465 return False 466 467 return value[0:2] == b'GP' 468 469def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]: 470 """ 471 Converts GeoPackage WKB to standard WKB by removing the header. 472 473 Parameters 474 ---------- 475 gpkg_wkb_bytes: bytes 476 The GeoPackage WKB byte string. 477 478 Returns 479 ------- 480 A tuple containing the standard WKB bytes, SRID, and flags. 481 """ 482 magic_number = gpkg_wkb_bytes[0:2] 483 if magic_number != b'GP': 484 raise ValueError("Invalid GeoPackage WKB header: missing magic number.") 485 486 try: 487 header = gpkg_wkb_bytes[0:8] 488 header_vals = struct.unpack('<ccBBi', header) 489 flags = header_vals[-2] 490 srid = header_vals[-1] 491 except struct.error: 492 header = gpkg_wkb_bytes[0:6] 493 header_vals = struct.unpack('<ccBBh', header) 494 flags = header_vals[-2] 495 srid = header_vals[-1] 496 497 envelope_type = (flags >> 1) & 0x07 498 envelope_sizes = { 499 0: 0, 500 1: 32, 501 2: 48, 502 3: 48, 503 4: 64, 504 } 505 header_length = 8 + envelope_sizes.get(envelope_type, 0) 506 standard_wkb_bytes = gpkg_wkb_bytes[header_length:] 507 return standard_wkb_bytes, srid, flags 508 509 510def value_is_null(value: Any) -> bool: 511 """ 512 Determine if a value is a null-like string. 513 """ 514 return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>') 515 516 517def none_if_null(value: Any) -> Any: 518 """ 519 Return `None` if a value is a null-like string. 520 """ 521 return (None if value_is_null(value) else value) 522 523 524def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 525 """ 526 Quantize a given `Decimal` to a known scale and precision. 527 528 Parameters 529 ---------- 530 x: Decimal 531 The `Decimal` to be quantized. 532 533 precision: int 534 The total number of significant digits. 535 536 scale: int 537 The number of significant digits after the decimal point. 538 539 Returns 540 ------- 541 A `Decimal` quantized to the specified scale and precision. 542 """ 543 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 544 try: 545 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 546 except InvalidOperation: 547 pass 548 549 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.") 550 551 552def serialize_decimal( 553 x: Any, 554 quantize: bool = False, 555 precision: Optional[int] = None, 556 scale: Optional[int] = None, 557) -> Any: 558 """ 559 Return a quantized string of an input decimal. 560 561 Parameters 562 ---------- 563 x: Any 564 The potential decimal to be serialized. 565 566 quantize: bool, default False 567 If `True`, quantize the incoming Decimal to the specified scale and precision 568 before serialization. 569 570 precision: Optional[int], default None 571 The precision of the decimal to be quantized. 572 573 scale: Optional[int], default None 574 The scale of the decimal to be quantized. 575 576 Returns 577 ------- 578 A string of the input decimal or the input if not a Decimal. 579 """ 580 if not isinstance(x, Decimal): 581 return x 582 583 if value_is_null(x): 584 return None 585 586 if quantize and scale and precision: 587 x = quantize_decimal(x, precision, scale) 588 589 return f"{x:f}" 590 591 592def coerce_timezone( 593 dt: Any, 594 strip_utc: bool = False, 595) -> Any: 596 """ 597 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 598 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 599 """ 600 if dt is None: 601 return None 602 603 if isinstance(dt, int): 604 return dt 605 606 if isinstance(dt, str): 607 dateutil_parser = mrsm.attempt_import('dateutil.parser') 608 try: 609 dt = dateutil_parser.parse(dt) 610 except Exception: 611 return dt 612 613 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 614 if dt_is_series: 615 pandas = mrsm.attempt_import('pandas', lazy=False) 616 617 if ( 618 pandas.api.types.is_datetime64_any_dtype(dt) and ( 619 (dt.dt.tz is not None and not strip_utc) 620 or 621 (dt.dt.tz is None and strip_utc) 622 ) 623 ): 624 return dt 625 626 dt_series = to_datetime(dt, coerce_utc=False) 627 if dt_series.dt.tz is None: 628 dt_series = dt_series.dt.tz_localize(timezone.utc) 629 if strip_utc: 630 try: 631 if dt_series.dt.tz is not None: 632 dt_series = dt_series.dt.tz_localize(None) 633 except Exception: 634 pass 635 636 return dt_series 637 638 if dt.tzinfo is None: 639 if strip_utc: 640 return dt 641 return dt.replace(tzinfo=timezone.utc) 642 643 utc_dt = dt.astimezone(timezone.utc) 644 if strip_utc: 645 return utc_dt.replace(tzinfo=None) 646 return utc_dt 647 648 649def to_datetime( 650 dt_val: Any, 651 as_pydatetime: bool = False, 652 coerce_utc: bool = True, 653 precision_unit: Optional[str] = None, 654) -> Any: 655 """ 656 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 657 658 Parameters 659 ---------- 660 dt_val: Any 661 The value to coerce to Pandas Timestamps. 662 663 as_pydatetime: bool, default False 664 If `True`, return a Python datetime object. 665 666 coerce_utc: bool, default True 667 If `True`, ensure the value has UTC tzinfo. 668 669 precision_unit: Optional[str], default None 670 If provided, enforce the provided precision unit. 671 """ 672 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 673 is_dask = 'dask' in getattr(dt_val, '__module__', '') 674 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 675 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 676 pd = pandas if dd is None else dd 677 enforce_precision = precision_unit is not None 678 precision_unit = precision_unit or 'microsecond' 679 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 680 precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None) 681 if not precision_abbreviation: 682 raise ValueError(f"Invalid precision '{precision_unit}'.") 683 684 def parse(x: Any) -> Any: 685 try: 686 return dateutil_parser.parse(x) 687 except Exception: 688 return x 689 690 def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool: 691 dtype_check_against = ( 692 f"datetime64[{precision_abbreviation}, UTC]" 693 if with_utc 694 else f"datetime64[{precision_abbreviation}]" 695 ) 696 return ( 697 dtype_to_check == dtype_check_against 698 if enforce_precision 699 else ( 700 dtype_to_check.startswith('datetime64[') 701 and ( 702 ('utc' in dtype_to_check.lower()) 703 if with_utc 704 else ('utc' not in dtype_to_check.lower()) 705 ) 706 ) 707 ) 708 709 if isinstance(dt_val, pd.Timestamp): 710 dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime() 711 return ( 712 coerce_timezone(dt_val_to_return) 713 if coerce_utc 714 else dt_val_to_return 715 ) 716 717 if dt_is_series: 718 changed_tz = False 719 original_tz = None 720 dtype = str(getattr(dt_val, 'dtype', 'object')) 721 if ( 722 are_dtypes_equal(dtype, 'datetime') 723 and 'utc' not in dtype.lower() 724 and hasattr(dt_val, 'dt') 725 ): 726 original_tz = dt_val.dt.tz 727 dt_val = dt_val.dt.tz_localize(timezone.utc) 728 changed_tz = True 729 dtype = str(getattr(dt_val, 'dtype', 'object')) 730 try: 731 new_dt_series = ( 732 dt_val 733 if check_dtype(dtype, with_utc=True) 734 else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]") 735 ) 736 except pd.errors.OutOfBoundsDatetime: 737 try: 738 next_precision = get_next_precision_unit(true_precision_unit) 739 next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision] 740 new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]") 741 except Exception: 742 new_dt_series = None 743 except ValueError: 744 new_dt_series = None 745 except TypeError: 746 try: 747 new_dt_series = ( 748 new_dt_series 749 if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False) 750 else dt_val.astype(f"datetime64[{precision_abbreviation}]") 751 ) 752 except Exception: 753 new_dt_series = None 754 755 if new_dt_series is None: 756 new_dt_series = dt_val.apply(lambda x: parse(str(x))) 757 758 if coerce_utc: 759 return coerce_timezone(new_dt_series) 760 761 if changed_tz: 762 new_dt_series = new_dt_series.dt.tz_localize(original_tz) 763 return new_dt_series 764 765 try: 766 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 767 if new_dt_val.unit != precision_abbreviation: 768 new_dt_val = new_dt_val.as_unit(precision_abbreviation) 769 if as_pydatetime: 770 return new_dt_val.to_pydatetime() 771 return new_dt_val 772 except (pd.errors.OutOfBoundsDatetime, ValueError): 773 pass 774 775 new_dt_val = parse(dt_val) 776 if not coerce_utc: 777 return new_dt_val 778 return coerce_timezone(new_dt_val) 779 780 781def serialize_bytes(data: bytes) -> str: 782 """ 783 Return the given bytes as a base64-encoded string. 784 """ 785 import base64 786 if not isinstance(data, bytes) and value_is_null(data): 787 return data 788 return base64.b64encode(data).decode('utf-8') 789 790 791def serialize_geometry( 792 geom: Any, 793 geometry_format: str = 'wkb_hex', 794 srid: Optional[int] = None, 795) -> Union[str, Dict[str, Any], bytes, None]: 796 """ 797 Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 798 799 Parameters 800 ---------- 801 geom: Any 802 The potential geometry data to be serialized. 803 804 geometry_format: str, default 'wkb_hex' 805 The serialization format for geometry data. 806 Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`. 807 808 srid: Optional[int], default None 809 If provided, use this as the source CRS when serializing to GeoJSON. 810 811 Returns 812 ------- 813 A string containing the geometry data, or bytes, or a dictionary, or None. 814 """ 815 if value_is_null(geom): 816 return None 817 818 shapely, shapely_ops, pyproj, np = mrsm.attempt_import( 819 'shapely', 'shapely.ops', 'pyproj', 'numpy', 820 lazy=False, 821 ) 822 if geometry_format == 'geojson': 823 if srid: 824 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True) 825 geom = shapely_ops.transform(transformer.transform, geom) 826 geojson_str = shapely.to_geojson(geom) 827 return json.loads(geojson_str) 828 829 if not hasattr(geom, 'wkb_hex'): 830 return str(geom) 831 832 byte_order = 1 if np.little_endian else 0 833 834 if geometry_format.startswith("wkb"): 835 return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True) 836 elif geometry_format == 'gpkg_wkb': 837 wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order) 838 flags = ( 839 ((byte_order & 0x01) | (0x20)) 840 if geom.is_empty 841 else (byte_order & 0x01) 842 ) 843 srid_val = srid or -1 844 header = struct.pack( 845 '<ccBBi', 846 b'G', b'P', 847 0, 848 flags, 849 srid_val 850 ) 851 return header + wkb_data 852 853 return shapely.to_wkt(geom) 854 855 856def deserialize_geometry(geom_wkb: Union[str, bytes]): 857 """ 858 Deserialize a WKB string into a shapely geometry object. 859 """ 860 shapely = mrsm.attempt_import('shapely', lazy=False) 861 return shapely.wkb.loads(geom_wkb) 862 863 864def project_geometry(geom, srid: int, to_srid: int = 4326): 865 """ 866 Project a shapely geometry object to a new CRS (SRID). 867 """ 868 pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False) 869 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True) 870 return shapely_ops.transform(transformer.transform, geom) 871 872 873def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]: 874 """ 875 Given a serialized ASCII string of bytes data, return the original bytes. 876 The input data may either be base64- or hex-encoded. 877 878 Parameters 879 ---------- 880 data: Optional[str] 881 The string to be deserialized into bytes. 882 May be base64- or hex-encoded (prefixed with `'\\x'`). 883 884 force_hex: bool = False 885 If `True`, treat the input string as hex-encoded. 886 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 887 This will still strip the leading `'\\x'` prefix if present. 888 889 Returns 890 ------- 891 The original bytes used to produce the encoded string `data`. 892 """ 893 if not isinstance(data, str) and value_is_null(data): 894 return data 895 896 import binascii 897 import base64 898 899 is_hex = force_hex or data.startswith('\\x') 900 901 if is_hex: 902 if data.startswith('\\x'): 903 data = data[2:] 904 return binascii.unhexlify(data) 905 906 return base64.b64decode(data) 907 908 909def deserialize_base64(data: str) -> bytes: 910 """ 911 Return the original bytestring from the given base64-encoded string. 912 """ 913 import base64 914 return base64.b64decode(data) 915 916 917def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]: 918 """ 919 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 920 """ 921 import binascii 922 if not isinstance(data, bytes) and value_is_null(data): 923 return data 924 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8') 925 926 927def serialize_datetime(dt: datetime) -> Union[str, None]: 928 """ 929 Serialize a datetime object into JSON (ISO format string). 930 931 Examples 932 -------- 933 >>> import json 934 >>> from datetime import datetime 935 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 936 '{"a": "2022-01-01T00:00:00Z"}' 937 938 """ 939 if not hasattr(dt, 'isoformat'): 940 return None 941 942 tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else '' 943 return dt.isoformat() + tz_suffix 944 945 946def serialize_date(d: date) -> Union[str, None]: 947 """ 948 Serialize a date object into its ISO representation. 949 """ 950 return d.isoformat() if hasattr(d, 'isoformat') else None 951 952 953def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]: 954 """ 955 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 956 957 Parameters 958 ---------- 959 x: Any 960 The value to serialize. 961 962 default_to_str: bool, default True 963 If `True`, return a string of `x` if x is not a designated type. 964 Otherwise return x. 965 966 Returns 967 ------- 968 A serialized version of x, or x. 969 """ 970 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 971 return x.meta 972 973 if hasattr(x, 'tzinfo'): 974 return serialize_datetime(x) 975 976 if hasattr(x, 'isoformat'): 977 return serialize_date(x) 978 979 if isinstance(x, bytes): 980 return serialize_bytes(x) 981 982 if isinstance(x, Decimal): 983 return serialize_decimal(x) 984 985 if 'shapely' in str(type(x)): 986 return serialize_geometry(x) 987 988 if value_is_null(x): 989 return None 990 991 if isinstance(x, (dict, list, tuple)): 992 return json.dumps(x, default=json_serialize_value, separators=(',', ':')) 993 994 return str(x) if default_to_str else x 995 996 997def get_geometry_type_srid( 998 dtype: str = 'geometry', 999 default_type: str = 'geometry', 1000 default_srid: int = 4326, 1001) -> Union[Tuple[str, int], Tuple[str, None]]: 1002 """ 1003 Given the specified geometry `dtype`, return a tuple in the form (type, SRID). 1004 1005 Parameters 1006 ---------- 1007 dtype: Optional[str], default None 1008 Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`). 1009 You may specify a supported `shapely` geometry type and an SRID in the dtype modifier: 1010 1011 - `Point` 1012 - `LineString` 1013 - `LinearRing` 1014 - `Polygon` 1015 - `MultiPoint` 1016 - `MultiLineString` 1017 - `MultiPolygon` 1018 - `GeometryCollection` 1019 1020 Returns 1021 ------- 1022 A tuple in the form (type, SRID). 1023 Defaults to `(default_type, default_srid)`. 1024 1025 Examples 1026 -------- 1027 >>> from meerschaum.utils.dtypes import get_geometry_type_srid 1028 >>> get_geometry_type_srid() 1029 ('geometry', 4326) 1030 >>> get_geometry_type_srid('geometry[]') 1031 ('geometry', 4326) 1032 >>> get_geometry_type_srid('geometry[Point, 0]') 1033 ('Point', 0) 1034 >>> get_geometry_type_srid('geometry[0, Point]') 1035 ('Point', 0) 1036 >>> get_geometry_type_srid('geometry[0]') 1037 ('geometry', 0) 1038 >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]') 1039 ('MultiLineString', 4326) 1040 >>> get_geometry_type_srid('geography') 1041 ('geometry', 4326) 1042 >>> get_geometry_type_srid('geography[POINT]') 1043 ('Point', 4376) 1044 """ 1045 from meerschaum.utils.misc import is_int 1046 ### NOTE: PostGIS syntax must also be parsed. 1047 dtype = dtype.replace('(', '[').replace(')', ']') 1048 bare_dtype = dtype.split('[', maxsplit=1)[0] 1049 modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']') 1050 if not modifier: 1051 return default_type, default_srid 1052 1053 parts = [ 1054 part.split('=')[-1].strip() 1055 for part in modifier.split(',') 1056 ] 1057 parts_casted = [ 1058 ( 1059 int(part) 1060 if is_int(part) 1061 else part 1062 ) 1063 for part in parts 1064 ] 1065 1066 srid = default_srid 1067 geometry_type = default_type 1068 1069 for part in parts_casted: 1070 if isinstance(part, int): 1071 srid = part 1072 break 1073 1074 for part in parts_casted: 1075 if isinstance(part, str): 1076 geometry_type = part 1077 break 1078 1079 return geometry_type, srid 1080 1081 1082def get_current_timestamp( 1083 precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'], 1084 precision_interval: int = 1, 1085 round_to: str = 'down', 1086 as_pandas: bool = False, 1087 as_int: bool = False, 1088 _now: Union[datetime, int, None] = None, 1089) -> 'Union[datetime, pd.Timestamp, int]': 1090 """ 1091 Return the current UTC timestamp to nanosecond precision. 1092 1093 Parameters 1094 ---------- 1095 precision_unit: str, default 'us' 1096 The precision of the timestamp to be returned. 1097 Valid values are the following: 1098 - `ns` / `nanosecond` 1099 - `us` / `microsecond` 1100 - `ms` / `millisecond` 1101 - `s` / `sec` / `second` 1102 - `m` / `min` / `minute` 1103 - `h` / `hr` / `hour` 1104 - `d` / `day` 1105 1106 precision_interval: int, default 1 1107 Round the timestamp to the `precision_interval` units. 1108 For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals. 1109 Note: `precision_interval` must be 1 when `precision='nanosecond'`. 1110 1111 round_to: str, default 'down' 1112 The direction to which to round the timestamp. 1113 Available options are `down`, `up`, and `closest`. 1114 1115 as_pandas: bool, default False 1116 If `True`, return a Pandas Timestamp. 1117 This is always true if `unit` is `nanosecond`. 1118 1119 as_int: bool, default False 1120 If `True`, return the timestamp to an integer. 1121 Overrides `as_pandas`. 1122 1123 Returns 1124 ------- 1125 A Pandas Timestamp, datetime object, or integer with precision to the provided unit. 1126 1127 Examples 1128 -------- 1129 >>> get_current_timestamp('ns') 1130 Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC') 1131 >>> get_current_timestamp('ms') 1132 Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC') 1133 """ 1134 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1135 if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS: 1136 from meerschaum.utils.misc import items_str 1137 raise ValueError( 1138 f"Unknown precision unit '{precision_unit}'. " 1139 "Accepted values are " 1140 f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}." 1141 ) 1142 1143 if not as_int: 1144 as_pandas = as_pandas or true_precision_unit == 'nanosecond' 1145 pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None 1146 1147 if true_precision_unit == 'nanosecond': 1148 if precision_interval != 1: 1149 warn("`precision_interval` must be 1 for nanosecond precision.") 1150 now_ts = time.time_ns() if not isinstance(_now, int) else _now 1151 if as_int: 1152 return now_ts 1153 return pd.to_datetime(now_ts, unit='ns', utc=True) 1154 1155 now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now 1156 delta = timedelta(**{true_precision_unit + 's': precision_interval}) 1157 rounded_now = round_time(now, delta, to=round_to) 1158 1159 if as_int: 1160 return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit]) 1161 1162 ts_val = ( 1163 pd.to_datetime(rounded_now, utc=True) 1164 if as_pandas 1165 else rounded_now 1166 ) 1167 1168 if not as_pandas: 1169 return ts_val 1170 1171 as_unit_precisions = ('microsecond', 'millisecond', 'second') 1172 if true_precision_unit not in as_unit_precisions: 1173 return ts_val 1174 1175 return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit]) 1176 1177 1178def is_dtype_special(type_: str) -> bool: 1179 """ 1180 Return whether a dtype should be treated as a special Meerschaum dtype. 1181 This is not the same as a Meerschaum alias. 1182 """ 1183 true_type = MRSM_ALIAS_DTYPES.get(type_, type_) 1184 if true_type in ( 1185 'uuid', 1186 'json', 1187 'bytes', 1188 'numeric', 1189 'datetime', 1190 'geometry', 1191 'geography', 1192 'date', 1193 'bool', 1194 ): 1195 return True 1196 1197 if are_dtypes_equal(true_type, 'datetime'): 1198 return True 1199 1200 if are_dtypes_equal(true_type, 'date'): 1201 return True 1202 1203 if true_type.startswith('numeric'): 1204 return True 1205 1206 if true_type.startswith('bool'): 1207 return True 1208 1209 if true_type.startswith('geometry'): 1210 return True 1211 1212 if true_type.startswith('geography'): 1213 return True 1214 1215 return False 1216 1217 1218def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str: 1219 """ 1220 Get the next precision string in order of value. 1221 1222 Parameters 1223 ---------- 1224 precision_unit: str 1225 The precision string (`'nanosecond'`, `'ms'`, etc.). 1226 1227 decrease: bool, defaul True 1228 If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`). 1229 If `False`, return the precision unit which is higher. 1230 1231 Returns 1232 ------- 1233 A `precision` string which is lower or higher than the given precision unit. 1234 1235 Examples 1236 -------- 1237 >>> get_next_precision_unit('nanosecond') 1238 'microsecond' 1239 >>> get_next_precision_unit('ms') 1240 'second' 1241 >>> get_next_precision_unit('hour', decrease=False) 1242 'minute' 1243 """ 1244 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1245 precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None) 1246 if not precision_scalar: 1247 raise ValueError(f"Invalid precision unit '{precision_unit}'.") 1248 1249 precisions = sorted( 1250 list(MRSM_PRECISION_UNITS_SCALARS), 1251 key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p] 1252 ) 1253 1254 precision_index = precisions.index(true_precision_unit) 1255 new_precision_index = precision_index + (-1 if decrease else 1) 1256 if new_precision_index < 0 or new_precision_index >= len(precisions): 1257 raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.") 1258 1259 return precisions[new_precision_index] 1260 1261 1262def round_time( 1263 dt: Optional[datetime] = None, 1264 date_delta: Optional[timedelta] = None, 1265 to: 'str' = 'down' 1266) -> datetime: 1267 """ 1268 Round a datetime object to a multiple of a timedelta. 1269 1270 Parameters 1271 ---------- 1272 dt: Optional[datetime], default None 1273 If `None`, grab the current UTC datetime. 1274 1275 date_delta: Optional[timedelta], default None 1276 If `None`, use a delta of 1 minute. 1277 1278 to: 'str', default 'down' 1279 Available options are `'up'`, `'down'`, and `'closest'`. 1280 1281 Returns 1282 ------- 1283 A rounded `datetime` object. 1284 1285 Examples 1286 -------- 1287 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 1288 datetime.datetime(2022, 1, 1, 12, 15) 1289 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 1290 datetime.datetime(2022, 1, 1, 12, 16) 1291 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 1292 datetime.datetime(2022, 1, 1, 12, 0) 1293 >>> round_time( 1294 ... datetime(2022, 1, 1, 12, 15, 57, 200), 1295 ... timedelta(hours=1), 1296 ... to = 'closest' 1297 ... ) 1298 datetime.datetime(2022, 1, 1, 12, 0) 1299 >>> round_time( 1300 ... datetime(2022, 1, 1, 12, 45, 57, 200), 1301 ... datetime.timedelta(hours=1), 1302 ... to = 'closest' 1303 ... ) 1304 datetime.datetime(2022, 1, 1, 13, 0) 1305 1306 """ 1307 from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP 1308 if date_delta is None: 1309 date_delta = timedelta(minutes=1) 1310 1311 if dt is None: 1312 dt = datetime.now(timezone.utc).replace(tzinfo=None) 1313 1314 def get_total_microseconds(td: timedelta) -> int: 1315 return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds 1316 1317 round_to_microseconds = get_total_microseconds(date_delta) 1318 if round_to_microseconds == 0: 1319 return dt 1320 1321 dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min 1322 dt_total_microseconds = get_total_microseconds(dt_delta_from_min) 1323 1324 dt_dec = Decimal(dt_total_microseconds) 1325 round_to_dec = Decimal(round_to_microseconds) 1326 1327 div = dt_dec / round_to_dec 1328 if to == 'down': 1329 num_intervals = div.to_integral_value(rounding=ROUND_DOWN) 1330 elif to == 'up': 1331 num_intervals = div.to_integral_value(rounding=ROUND_UP) 1332 else: 1333 num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP) 1334 1335 rounded_dt_total_microseconds = num_intervals * round_to_dec 1336 adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds 1337 1338 return dt + timedelta(microseconds=adjustment_microseconds)
93def to_pandas_dtype(dtype: str) -> str: 94 """ 95 Cast a supported Meerschaum dtype to a Pandas dtype. 96 """ 97 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 98 if known_dtype is not None: 99 return known_dtype 100 101 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 102 if alias_dtype is not None: 103 return MRSM_PD_DTYPES[alias_dtype] 104 105 if dtype.startswith('numeric'): 106 return MRSM_PD_DTYPES['numeric'] 107 108 if dtype.startswith('geometry'): 109 return MRSM_PD_DTYPES['geometry'] 110 111 if dtype.startswith('geography'): 112 return MRSM_PD_DTYPES['geography'] 113 114 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 115 ### treat it as a SQL db type. 116 if dtype.split(' ')[0].isupper(): 117 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 118 return get_pd_type_from_db_type(dtype) 119 120 from meerschaum.utils.packages import attempt_import 121 _ = attempt_import('pyarrow', lazy=False) 122 pandas = attempt_import('pandas', lazy=False) 123 124 try: 125 return str(pandas.api.types.pandas_dtype(dtype)) 126 except Exception: 127 warn( 128 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 129 + f"{traceback.format_exc()}", 130 stack=False, 131 ) 132 return 'object'
Cast a supported Meerschaum dtype to a Pandas dtype.
135def are_dtypes_equal( 136 ldtype: Union[str, Dict[str, str]], 137 rdtype: Union[str, Dict[str, str]], 138) -> bool: 139 """ 140 Determine whether two dtype strings may be considered 141 equivalent to avoid unnecessary conversions. 142 143 Parameters 144 ---------- 145 ldtype: Union[str, Dict[str, str]] 146 The left dtype to compare. 147 May also provide a dtypes dictionary. 148 149 rdtype: Union[str, Dict[str, str]] 150 The right dtype to compare. 151 May also provide a dtypes dictionary. 152 153 Returns 154 ------- 155 A `bool` indicating whether the two dtypes are to be considered equivalent. 156 """ 157 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 158 lkeys = sorted([str(k) for k in ldtype.keys()]) 159 rkeys = sorted([str(k) for k in rdtype.keys()]) 160 for lkey, rkey in zip(lkeys, rkeys): 161 if lkey != rkey: 162 return False 163 ltype = ldtype[lkey] 164 rtype = rdtype[rkey] 165 if not are_dtypes_equal(ltype, rtype): 166 return False 167 return True 168 169 try: 170 if ldtype == rdtype: 171 return True 172 except Exception: 173 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 174 return False 175 176 ### Sometimes pandas dtype objects are passed. 177 ldtype = str(ldtype).split('[', maxsplit=1)[0] 178 rdtype = str(rdtype).split('[', maxsplit=1)[0] 179 180 if ldtype in MRSM_ALIAS_DTYPES: 181 ldtype = MRSM_ALIAS_DTYPES[ldtype] 182 183 if rdtype in MRSM_ALIAS_DTYPES: 184 rdtype = MRSM_ALIAS_DTYPES[rdtype] 185 186 json_dtypes = ('json', 'object') 187 if ldtype in json_dtypes and rdtype in json_dtypes: 188 return True 189 190 numeric_dtypes = ('numeric', 'decimal', 'object') 191 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 192 return True 193 194 uuid_dtypes = ('uuid', 'object') 195 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 196 return True 197 198 bytes_dtypes = ('bytes', 'object', 'binary') 199 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 200 return True 201 202 geometry_dtypes = ('geometry', 'object', 'geography') 203 if ldtype in geometry_dtypes and rdtype in geometry_dtypes: 204 return True 205 206 if ldtype.lower() == rdtype.lower(): 207 return True 208 209 datetime_dtypes = ('datetime', 'timestamp') 210 ldtype_found_dt_prefix = False 211 rdtype_found_dt_prefix = False 212 for dt_prefix in datetime_dtypes: 213 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 214 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 215 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 216 return True 217 218 string_dtypes = ('str', 'string', 'object') 219 if ldtype in string_dtypes and rdtype in string_dtypes: 220 return True 221 222 int_dtypes = ( 223 'int', 'int64', 'int32', 'int16', 'int8', 224 'uint', 'uint64', 'uint32', 'uint16', 'uint8', 225 ) 226 int_substrings = ('int',) 227 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 228 return True 229 for substring in int_substrings: 230 if substring in ldtype.lower() and substring in rdtype.lower(): 231 return True 232 233 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 234 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 235 return True 236 237 bool_dtypes = ('bool', 'boolean') 238 if ldtype in bool_dtypes and rdtype in bool_dtypes: 239 return True 240 241 date_dtypes = ( 242 'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]', 243 'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]', 244 ) 245 if ldtype in date_dtypes and rdtype in date_dtypes: 246 return True 247 248 return False
Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.
Parameters
- ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
- rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
- A
boolindicating whether the two dtypes are to be considered equivalent.
251def is_dtype_numeric(dtype: str) -> bool: 252 """ 253 Determine whether a given `dtype` string 254 should be considered compatible with the Meerschaum dtype `numeric`. 255 256 Parameters 257 ---------- 258 dtype: str 259 The pandas-like dtype string. 260 261 Returns 262 ------- 263 A bool indicating the dtype is compatible with `numeric`. 264 """ 265 dtype_lower = dtype.lower() 266 267 acceptable_substrings = ('numeric', 'float', 'double', 'int') 268 for substring in acceptable_substrings: 269 if substring in dtype_lower: 270 return True 271 272 return False
Determine whether a given dtype string
should be considered compatible with the Meerschaum dtype numeric.
Parameters
- dtype (str): The pandas-like dtype string.
Returns
- A bool indicating the dtype is compatible with
numeric.
275def attempt_cast_to_numeric( 276 value: Any, 277 quantize: bool = False, 278 precision: Optional[int] = None, 279 scale: Optional[int] = None, 280)-> Any: 281 """ 282 Given a value, attempt to coerce it into a numeric (Decimal). 283 284 Parameters 285 ---------- 286 value: Any 287 The value to be cast to a Decimal. 288 289 quantize: bool, default False 290 If `True`, quantize the decimal to the specified precision and scale. 291 292 precision: Optional[int], default None 293 If `quantize` is `True`, use this precision. 294 295 scale: Optional[int], default None 296 If `quantize` is `True`, use this scale. 297 298 Returns 299 ------- 300 A `Decimal` if possible, or `value`. 301 """ 302 if isinstance(value, Decimal): 303 if quantize and precision and scale: 304 return quantize_decimal(value, precision, scale) 305 return value 306 try: 307 if value_is_null(value): 308 return Decimal('NaN') 309 310 dec = Decimal(str(value)) 311 if not quantize or not precision or not scale: 312 return dec 313 return quantize_decimal(dec, precision, scale) 314 except Exception: 315 return value
Given a value, attempt to coerce it into a numeric (Decimal).
Parameters
- value (Any): The value to be cast to a Decimal.
- quantize (bool, default False):
If
True, quantize the decimal to the specified precision and scale. - precision (Optional[int], default None):
If
quantizeisTrue, use this precision. - scale (Optional[int], default None):
If
quantizeisTrue, use this scale.
Returns
- A
Decimalif possible, orvalue.
318def attempt_cast_to_uuid(value: Any) -> Any: 319 """ 320 Given a value, attempt to coerce it into a UUID (`uuid4`). 321 """ 322 if isinstance(value, uuid.UUID): 323 return value 324 try: 325 return ( 326 uuid.UUID(str(value)) 327 if not value_is_null(value) 328 else None 329 ) 330 except Exception: 331 return value
Given a value, attempt to coerce it into a UUID (uuid4).
334def attempt_cast_to_bytes(value: Any) -> Any: 335 """ 336 Given a value, attempt to coerce it into a bytestring. 337 """ 338 if isinstance(value, bytes): 339 return value 340 try: 341 return ( 342 deserialize_bytes_string(str(value)) 343 if not value_is_null(value) 344 else None 345 ) 346 except Exception: 347 return value
Given a value, attempt to coerce it into a bytestring.
350def attempt_cast_to_geometry(value: Any) -> Any: 351 """ 352 Given a value, attempt to coerce it into a `shapely` (`geometry`) object. 353 """ 354 typ = str(type(value)) 355 if 'pandas' in typ and 'Series' in typ: 356 if 'GeoSeries' in typ: 357 return value 358 gpd = mrsm.attempt_import('geopandas') 359 if len(value) == 0: 360 return gpd.GeoSeries([]) 361 362 ix = value.first_valid_index() 363 if ix is None: 364 try: 365 return gpd.GeoSeries(value) 366 except Exception: 367 traceback.print_exc() 368 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 369 sample_val = value[ix] 370 371 sample_is_gpkg = geometry_is_gpkg(sample_val) 372 if sample_is_gpkg: 373 try: 374 value = value.apply(lambda x: gpkg_wkb_to_wkb(x)[0]) 375 except Exception: 376 traceback.print_exc() 377 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 378 379 sample_is_wkt = geometry_is_wkt(sample_val) if not sample_is_gpkg else False 380 try: 381 return ( 382 gpd.GeoSeries.from_wkt(value) 383 if sample_is_wkt 384 else gpd.GeoSeries.from_wkb(value) 385 ) 386 except Exception: 387 traceback.print_exc() 388 return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value) 389 390 if 'shapely' in typ: 391 return value 392 393 shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import( 394 'shapely', 395 'shapely.wkt', 396 'shapely.wkb', 397 lazy=False, 398 ) 399 400 if isinstance(value, (dict, list)): 401 try: 402 return shapely.from_geojson(json.dumps(value)) 403 except Exception: 404 return value 405 406 value_is_gpkg = geometry_is_gpkg(value) 407 if value_is_gpkg: 408 try: 409 wkb_data, _, _ = gpkg_wkb_to_wkb(value) 410 return shapely_wkb.loads(wkb_data) 411 except Exception: 412 return value 413 414 value_is_wkt = geometry_is_wkt(value) 415 if value_is_wkt is None: 416 return value 417 418 try: 419 return ( 420 shapely_wkt.loads(value) 421 if value_is_wkt 422 else shapely_wkb.loads(value) 423 ) 424 except Exception: 425 pass 426 427 return value
Given a value, attempt to coerce it into a shapely (geometry) object.
430def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]: 431 """ 432 Determine whether an input value should be treated as WKT or WKB geometry data. 433 434 Parameters 435 ---------- 436 value: Union[str, bytes] 437 The input data to be parsed into geometry data. 438 439 Returns 440 ------- 441 A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB). 442 Return `None` if `value` should be parsed as neither. 443 """ 444 import re 445 if not isinstance(value, (str, bytes)): 446 return None 447 448 if isinstance(value, bytes): 449 return False 450 451 wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$' 452 if re.match(wkt_pattern, value, re.IGNORECASE): 453 return True 454 455 if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0: 456 return False 457 458 return None
Determine whether an input value should be treated as WKT or WKB geometry data.
Parameters
- value (Union[str, bytes]): The input data to be parsed into geometry data.
Returns
- A
bool(Trueifvalueis WKT andFalseif it should be treated as WKB). - Return
Noneifvalueshould be parsed as neither.
461def geometry_is_gpkg(value: bytes) -> bool: 462 """ 463 Return whether the input `value` is formatted as GeoPackage WKB. 464 """ 465 if not isinstance(value, bytes) or len(value) < 2: 466 return False 467 468 return value[0:2] == b'GP'
Return whether the input value is formatted as GeoPackage WKB.
470def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]: 471 """ 472 Converts GeoPackage WKB to standard WKB by removing the header. 473 474 Parameters 475 ---------- 476 gpkg_wkb_bytes: bytes 477 The GeoPackage WKB byte string. 478 479 Returns 480 ------- 481 A tuple containing the standard WKB bytes, SRID, and flags. 482 """ 483 magic_number = gpkg_wkb_bytes[0:2] 484 if magic_number != b'GP': 485 raise ValueError("Invalid GeoPackage WKB header: missing magic number.") 486 487 try: 488 header = gpkg_wkb_bytes[0:8] 489 header_vals = struct.unpack('<ccBBi', header) 490 flags = header_vals[-2] 491 srid = header_vals[-1] 492 except struct.error: 493 header = gpkg_wkb_bytes[0:6] 494 header_vals = struct.unpack('<ccBBh', header) 495 flags = header_vals[-2] 496 srid = header_vals[-1] 497 498 envelope_type = (flags >> 1) & 0x07 499 envelope_sizes = { 500 0: 0, 501 1: 32, 502 2: 48, 503 3: 48, 504 4: 64, 505 } 506 header_length = 8 + envelope_sizes.get(envelope_type, 0) 507 standard_wkb_bytes = gpkg_wkb_bytes[header_length:] 508 return standard_wkb_bytes, srid, flags
Converts GeoPackage WKB to standard WKB by removing the header.
Parameters
- gpkg_wkb_bytes (bytes): The GeoPackage WKB byte string.
Returns
- A tuple containing the standard WKB bytes, SRID, and flags.
511def value_is_null(value: Any) -> bool: 512 """ 513 Determine if a value is a null-like string. 514 """ 515 return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')
Determine if a value is a null-like string.
518def none_if_null(value: Any) -> Any: 519 """ 520 Return `None` if a value is a null-like string. 521 """ 522 return (None if value_is_null(value) else value)
Return None if a value is a null-like string.
525def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 526 """ 527 Quantize a given `Decimal` to a known scale and precision. 528 529 Parameters 530 ---------- 531 x: Decimal 532 The `Decimal` to be quantized. 533 534 precision: int 535 The total number of significant digits. 536 537 scale: int 538 The number of significant digits after the decimal point. 539 540 Returns 541 ------- 542 A `Decimal` quantized to the specified scale and precision. 543 """ 544 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 545 try: 546 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 547 except InvalidOperation: 548 pass 549 550 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
Quantize a given Decimal to a known scale and precision.
Parameters
- x (Decimal):
The
Decimalto be quantized. - precision (int): The total number of significant digits.
- scale (int): The number of significant digits after the decimal point.
Returns
- A
Decimalquantized to the specified scale and precision.
553def serialize_decimal( 554 x: Any, 555 quantize: bool = False, 556 precision: Optional[int] = None, 557 scale: Optional[int] = None, 558) -> Any: 559 """ 560 Return a quantized string of an input decimal. 561 562 Parameters 563 ---------- 564 x: Any 565 The potential decimal to be serialized. 566 567 quantize: bool, default False 568 If `True`, quantize the incoming Decimal to the specified scale and precision 569 before serialization. 570 571 precision: Optional[int], default None 572 The precision of the decimal to be quantized. 573 574 scale: Optional[int], default None 575 The scale of the decimal to be quantized. 576 577 Returns 578 ------- 579 A string of the input decimal or the input if not a Decimal. 580 """ 581 if not isinstance(x, Decimal): 582 return x 583 584 if value_is_null(x): 585 return None 586 587 if quantize and scale and precision: 588 x = quantize_decimal(x, precision, scale) 589 590 return f"{x:f}"
Return a quantized string of an input decimal.
Parameters
- x (Any): The potential decimal to be serialized.
- quantize (bool, default False):
If
True, quantize the incoming Decimal to the specified scale and precision before serialization. - precision (Optional[int], default None): The precision of the decimal to be quantized.
- scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
- A string of the input decimal or the input if not a Decimal.
593def coerce_timezone( 594 dt: Any, 595 strip_utc: bool = False, 596) -> Any: 597 """ 598 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 599 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 600 """ 601 if dt is None: 602 return None 603 604 if isinstance(dt, int): 605 return dt 606 607 if isinstance(dt, str): 608 dateutil_parser = mrsm.attempt_import('dateutil.parser') 609 try: 610 dt = dateutil_parser.parse(dt) 611 except Exception: 612 return dt 613 614 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 615 if dt_is_series: 616 pandas = mrsm.attempt_import('pandas', lazy=False) 617 618 if ( 619 pandas.api.types.is_datetime64_any_dtype(dt) and ( 620 (dt.dt.tz is not None and not strip_utc) 621 or 622 (dt.dt.tz is None and strip_utc) 623 ) 624 ): 625 return dt 626 627 dt_series = to_datetime(dt, coerce_utc=False) 628 if dt_series.dt.tz is None: 629 dt_series = dt_series.dt.tz_localize(timezone.utc) 630 if strip_utc: 631 try: 632 if dt_series.dt.tz is not None: 633 dt_series = dt_series.dt.tz_localize(None) 634 except Exception: 635 pass 636 637 return dt_series 638 639 if dt.tzinfo is None: 640 if strip_utc: 641 return dt 642 return dt.replace(tzinfo=timezone.utc) 643 644 utc_dt = dt.astimezone(timezone.utc) 645 if strip_utc: 646 return utc_dt.replace(tzinfo=None) 647 return utc_dt
Given a datetime, pandas Timestamp or Series of Timestamp,
return a UTC timestamp (strip timezone if strip_utc is True.
650def to_datetime( 651 dt_val: Any, 652 as_pydatetime: bool = False, 653 coerce_utc: bool = True, 654 precision_unit: Optional[str] = None, 655) -> Any: 656 """ 657 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 658 659 Parameters 660 ---------- 661 dt_val: Any 662 The value to coerce to Pandas Timestamps. 663 664 as_pydatetime: bool, default False 665 If `True`, return a Python datetime object. 666 667 coerce_utc: bool, default True 668 If `True`, ensure the value has UTC tzinfo. 669 670 precision_unit: Optional[str], default None 671 If provided, enforce the provided precision unit. 672 """ 673 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 674 is_dask = 'dask' in getattr(dt_val, '__module__', '') 675 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 676 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 677 pd = pandas if dd is None else dd 678 enforce_precision = precision_unit is not None 679 precision_unit = precision_unit or 'microsecond' 680 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 681 precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None) 682 if not precision_abbreviation: 683 raise ValueError(f"Invalid precision '{precision_unit}'.") 684 685 def parse(x: Any) -> Any: 686 try: 687 return dateutil_parser.parse(x) 688 except Exception: 689 return x 690 691 def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool: 692 dtype_check_against = ( 693 f"datetime64[{precision_abbreviation}, UTC]" 694 if with_utc 695 else f"datetime64[{precision_abbreviation}]" 696 ) 697 return ( 698 dtype_to_check == dtype_check_against 699 if enforce_precision 700 else ( 701 dtype_to_check.startswith('datetime64[') 702 and ( 703 ('utc' in dtype_to_check.lower()) 704 if with_utc 705 else ('utc' not in dtype_to_check.lower()) 706 ) 707 ) 708 ) 709 710 if isinstance(dt_val, pd.Timestamp): 711 dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime() 712 return ( 713 coerce_timezone(dt_val_to_return) 714 if coerce_utc 715 else dt_val_to_return 716 ) 717 718 if dt_is_series: 719 changed_tz = False 720 original_tz = None 721 dtype = str(getattr(dt_val, 'dtype', 'object')) 722 if ( 723 are_dtypes_equal(dtype, 'datetime') 724 and 'utc' not in dtype.lower() 725 and hasattr(dt_val, 'dt') 726 ): 727 original_tz = dt_val.dt.tz 728 dt_val = dt_val.dt.tz_localize(timezone.utc) 729 changed_tz = True 730 dtype = str(getattr(dt_val, 'dtype', 'object')) 731 try: 732 new_dt_series = ( 733 dt_val 734 if check_dtype(dtype, with_utc=True) 735 else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]") 736 ) 737 except pd.errors.OutOfBoundsDatetime: 738 try: 739 next_precision = get_next_precision_unit(true_precision_unit) 740 next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision] 741 new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]") 742 except Exception: 743 new_dt_series = None 744 except ValueError: 745 new_dt_series = None 746 except TypeError: 747 try: 748 new_dt_series = ( 749 new_dt_series 750 if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False) 751 else dt_val.astype(f"datetime64[{precision_abbreviation}]") 752 ) 753 except Exception: 754 new_dt_series = None 755 756 if new_dt_series is None: 757 new_dt_series = dt_val.apply(lambda x: parse(str(x))) 758 759 if coerce_utc: 760 return coerce_timezone(new_dt_series) 761 762 if changed_tz: 763 new_dt_series = new_dt_series.dt.tz_localize(original_tz) 764 return new_dt_series 765 766 try: 767 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 768 if new_dt_val.unit != precision_abbreviation: 769 new_dt_val = new_dt_val.as_unit(precision_abbreviation) 770 if as_pydatetime: 771 return new_dt_val.to_pydatetime() 772 return new_dt_val 773 except (pd.errors.OutOfBoundsDatetime, ValueError): 774 pass 775 776 new_dt_val = parse(dt_val) 777 if not coerce_utc: 778 return new_dt_val 779 return coerce_timezone(new_dt_val)
Wrap pd.to_datetime() and add support for out-of-bounds values.
Parameters
- dt_val (Any): The value to coerce to Pandas Timestamps.
- as_pydatetime (bool, default False):
If
True, return a Python datetime object. - coerce_utc (bool, default True):
If
True, ensure the value has UTC tzinfo. - precision_unit (Optional[str], default None): If provided, enforce the provided precision unit.
782def serialize_bytes(data: bytes) -> str: 783 """ 784 Return the given bytes as a base64-encoded string. 785 """ 786 import base64 787 if not isinstance(data, bytes) and value_is_null(data): 788 return data 789 return base64.b64encode(data).decode('utf-8')
Return the given bytes as a base64-encoded string.
792def serialize_geometry( 793 geom: Any, 794 geometry_format: str = 'wkb_hex', 795 srid: Optional[int] = None, 796) -> Union[str, Dict[str, Any], bytes, None]: 797 """ 798 Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 799 800 Parameters 801 ---------- 802 geom: Any 803 The potential geometry data to be serialized. 804 805 geometry_format: str, default 'wkb_hex' 806 The serialization format for geometry data. 807 Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`. 808 809 srid: Optional[int], default None 810 If provided, use this as the source CRS when serializing to GeoJSON. 811 812 Returns 813 ------- 814 A string containing the geometry data, or bytes, or a dictionary, or None. 815 """ 816 if value_is_null(geom): 817 return None 818 819 shapely, shapely_ops, pyproj, np = mrsm.attempt_import( 820 'shapely', 'shapely.ops', 'pyproj', 'numpy', 821 lazy=False, 822 ) 823 if geometry_format == 'geojson': 824 if srid: 825 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True) 826 geom = shapely_ops.transform(transformer.transform, geom) 827 geojson_str = shapely.to_geojson(geom) 828 return json.loads(geojson_str) 829 830 if not hasattr(geom, 'wkb_hex'): 831 return str(geom) 832 833 byte_order = 1 if np.little_endian else 0 834 835 if geometry_format.startswith("wkb"): 836 return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True) 837 elif geometry_format == 'gpkg_wkb': 838 wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order) 839 flags = ( 840 ((byte_order & 0x01) | (0x20)) 841 if geom.is_empty 842 else (byte_order & 0x01) 843 ) 844 srid_val = srid or -1 845 header = struct.pack( 846 '<ccBBi', 847 b'G', b'P', 848 0, 849 flags, 850 srid_val 851 ) 852 return header + wkb_data 853 854 return shapely.to_wkt(geom)
Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON.
Parameters
- geom (Any): The potential geometry data to be serialized.
- geometry_format (str, default 'wkb_hex'):
The serialization format for geometry data.
Accepted formats are
wkb,wkb_hex,wkt,geojson, andgpkg_wkb. - srid (Optional[int], default None): If provided, use this as the source CRS when serializing to GeoJSON.
Returns
- A string containing the geometry data, or bytes, or a dictionary, or None.
857def deserialize_geometry(geom_wkb: Union[str, bytes]): 858 """ 859 Deserialize a WKB string into a shapely geometry object. 860 """ 861 shapely = mrsm.attempt_import('shapely', lazy=False) 862 return shapely.wkb.loads(geom_wkb)
Deserialize a WKB string into a shapely geometry object.
865def project_geometry(geom, srid: int, to_srid: int = 4326): 866 """ 867 Project a shapely geometry object to a new CRS (SRID). 868 """ 869 pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False) 870 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True) 871 return shapely_ops.transform(transformer.transform, geom)
Project a shapely geometry object to a new CRS (SRID).
874def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]: 875 """ 876 Given a serialized ASCII string of bytes data, return the original bytes. 877 The input data may either be base64- or hex-encoded. 878 879 Parameters 880 ---------- 881 data: Optional[str] 882 The string to be deserialized into bytes. 883 May be base64- or hex-encoded (prefixed with `'\\x'`). 884 885 force_hex: bool = False 886 If `True`, treat the input string as hex-encoded. 887 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 888 This will still strip the leading `'\\x'` prefix if present. 889 890 Returns 891 ------- 892 The original bytes used to produce the encoded string `data`. 893 """ 894 if not isinstance(data, str) and value_is_null(data): 895 return data 896 897 import binascii 898 import base64 899 900 is_hex = force_hex or data.startswith('\\x') 901 902 if is_hex: 903 if data.startswith('\\x'): 904 data = data[2:] 905 return binascii.unhexlify(data) 906 907 return base64.b64decode(data)
Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.
Parameters
- data (Optional[str]):
The string to be deserialized into bytes.
May be base64- or hex-encoded (prefixed with
'\x'). - force_hex (bool = False):
If
True, treat the input string as hex-encoded. Ifdatadoes not begin with the prefix'\x', setforce_hextoTrue. This will still strip the leading'\x'prefix if present.
Returns
- The original bytes used to produce the encoded string
data.
910def deserialize_base64(data: str) -> bytes: 911 """ 912 Return the original bytestring from the given base64-encoded string. 913 """ 914 import base64 915 return base64.b64decode(data)
Return the original bytestring from the given base64-encoded string.
918def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]: 919 """ 920 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 921 """ 922 import binascii 923 if not isinstance(data, bytes) and value_is_null(data): 924 return data 925 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
Return the given bytes as a hex string for PostgreSQL's BYTEA type.
928def serialize_datetime(dt: datetime) -> Union[str, None]: 929 """ 930 Serialize a datetime object into JSON (ISO format string). 931 932 Examples 933 -------- 934 >>> import json 935 >>> from datetime import datetime 936 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 937 '{"a": "2022-01-01T00:00:00Z"}' 938 939 """ 940 if not hasattr(dt, 'isoformat'): 941 return None 942 943 tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else '' 944 return dt.isoformat() + tz_suffix
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
947def serialize_date(d: date) -> Union[str, None]: 948 """ 949 Serialize a date object into its ISO representation. 950 """ 951 return d.isoformat() if hasattr(d, 'isoformat') else None
Serialize a date object into its ISO representation.
954def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]: 955 """ 956 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 957 958 Parameters 959 ---------- 960 x: Any 961 The value to serialize. 962 963 default_to_str: bool, default True 964 If `True`, return a string of `x` if x is not a designated type. 965 Otherwise return x. 966 967 Returns 968 ------- 969 A serialized version of x, or x. 970 """ 971 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 972 return x.meta 973 974 if hasattr(x, 'tzinfo'): 975 return serialize_datetime(x) 976 977 if hasattr(x, 'isoformat'): 978 return serialize_date(x) 979 980 if isinstance(x, bytes): 981 return serialize_bytes(x) 982 983 if isinstance(x, Decimal): 984 return serialize_decimal(x) 985 986 if 'shapely' in str(type(x)): 987 return serialize_geometry(x) 988 989 if value_is_null(x): 990 return None 991 992 if isinstance(x, (dict, list, tuple)): 993 return json.dumps(x, default=json_serialize_value, separators=(',', ':')) 994 995 return str(x) if default_to_str else x
Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
Parameters
- x (Any): The value to serialize.
- default_to_str (bool, default True):
If
True, return a string ofxif x is not a designated type. Otherwise return x.
Returns
- A serialized version of x, or x.
998def get_geometry_type_srid( 999 dtype: str = 'geometry', 1000 default_type: str = 'geometry', 1001 default_srid: int = 4326, 1002) -> Union[Tuple[str, int], Tuple[str, None]]: 1003 """ 1004 Given the specified geometry `dtype`, return a tuple in the form (type, SRID). 1005 1006 Parameters 1007 ---------- 1008 dtype: Optional[str], default None 1009 Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`). 1010 You may specify a supported `shapely` geometry type and an SRID in the dtype modifier: 1011 1012 - `Point` 1013 - `LineString` 1014 - `LinearRing` 1015 - `Polygon` 1016 - `MultiPoint` 1017 - `MultiLineString` 1018 - `MultiPolygon` 1019 - `GeometryCollection` 1020 1021 Returns 1022 ------- 1023 A tuple in the form (type, SRID). 1024 Defaults to `(default_type, default_srid)`. 1025 1026 Examples 1027 -------- 1028 >>> from meerschaum.utils.dtypes import get_geometry_type_srid 1029 >>> get_geometry_type_srid() 1030 ('geometry', 4326) 1031 >>> get_geometry_type_srid('geometry[]') 1032 ('geometry', 4326) 1033 >>> get_geometry_type_srid('geometry[Point, 0]') 1034 ('Point', 0) 1035 >>> get_geometry_type_srid('geometry[0, Point]') 1036 ('Point', 0) 1037 >>> get_geometry_type_srid('geometry[0]') 1038 ('geometry', 0) 1039 >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]') 1040 ('MultiLineString', 4326) 1041 >>> get_geometry_type_srid('geography') 1042 ('geometry', 4326) 1043 >>> get_geometry_type_srid('geography[POINT]') 1044 ('Point', 4376) 1045 """ 1046 from meerschaum.utils.misc import is_int 1047 ### NOTE: PostGIS syntax must also be parsed. 1048 dtype = dtype.replace('(', '[').replace(')', ']') 1049 bare_dtype = dtype.split('[', maxsplit=1)[0] 1050 modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']') 1051 if not modifier: 1052 return default_type, default_srid 1053 1054 parts = [ 1055 part.split('=')[-1].strip() 1056 for part in modifier.split(',') 1057 ] 1058 parts_casted = [ 1059 ( 1060 int(part) 1061 if is_int(part) 1062 else part 1063 ) 1064 for part in parts 1065 ] 1066 1067 srid = default_srid 1068 geometry_type = default_type 1069 1070 for part in parts_casted: 1071 if isinstance(part, int): 1072 srid = part 1073 break 1074 1075 for part in parts_casted: 1076 if isinstance(part, str): 1077 geometry_type = part 1078 break 1079 1080 return geometry_type, srid
Given the specified geometry dtype, return a tuple in the form (type, SRID).
Parameters
dtype (Optional[str], default None): Optionally provide a specific
geometrysyntax (e.g.geometry[MultiLineString, 4326]). You may specify a supportedshapelygeometry type and an SRID in the dtype modifier:PointLineStringLinearRingPolygonMultiPointMultiLineStringMultiPolygonGeometryCollection
Returns
- A tuple in the form (type, SRID).
- Defaults to
(default_type, default_srid).
Examples
>>> from meerschaum.utils.dtypes import get_geometry_type_srid
>>> get_geometry_type_srid()
('geometry', 4326)
>>> get_geometry_type_srid('geometry[]')
('geometry', 4326)
>>> get_geometry_type_srid('geometry[Point, 0]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0, Point]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0]')
('geometry', 0)
>>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
('MultiLineString', 4326)
>>> get_geometry_type_srid('geography')
('geometry', 4326)
>>> get_geometry_type_srid('geography[POINT]')
('Point', 4376)
1083def get_current_timestamp( 1084 precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'], 1085 precision_interval: int = 1, 1086 round_to: str = 'down', 1087 as_pandas: bool = False, 1088 as_int: bool = False, 1089 _now: Union[datetime, int, None] = None, 1090) -> 'Union[datetime, pd.Timestamp, int]': 1091 """ 1092 Return the current UTC timestamp to nanosecond precision. 1093 1094 Parameters 1095 ---------- 1096 precision_unit: str, default 'us' 1097 The precision of the timestamp to be returned. 1098 Valid values are the following: 1099 - `ns` / `nanosecond` 1100 - `us` / `microsecond` 1101 - `ms` / `millisecond` 1102 - `s` / `sec` / `second` 1103 - `m` / `min` / `minute` 1104 - `h` / `hr` / `hour` 1105 - `d` / `day` 1106 1107 precision_interval: int, default 1 1108 Round the timestamp to the `precision_interval` units. 1109 For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals. 1110 Note: `precision_interval` must be 1 when `precision='nanosecond'`. 1111 1112 round_to: str, default 'down' 1113 The direction to which to round the timestamp. 1114 Available options are `down`, `up`, and `closest`. 1115 1116 as_pandas: bool, default False 1117 If `True`, return a Pandas Timestamp. 1118 This is always true if `unit` is `nanosecond`. 1119 1120 as_int: bool, default False 1121 If `True`, return the timestamp to an integer. 1122 Overrides `as_pandas`. 1123 1124 Returns 1125 ------- 1126 A Pandas Timestamp, datetime object, or integer with precision to the provided unit. 1127 1128 Examples 1129 -------- 1130 >>> get_current_timestamp('ns') 1131 Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC') 1132 >>> get_current_timestamp('ms') 1133 Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC') 1134 """ 1135 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1136 if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS: 1137 from meerschaum.utils.misc import items_str 1138 raise ValueError( 1139 f"Unknown precision unit '{precision_unit}'. " 1140 "Accepted values are " 1141 f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}." 1142 ) 1143 1144 if not as_int: 1145 as_pandas = as_pandas or true_precision_unit == 'nanosecond' 1146 pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None 1147 1148 if true_precision_unit == 'nanosecond': 1149 if precision_interval != 1: 1150 warn("`precision_interval` must be 1 for nanosecond precision.") 1151 now_ts = time.time_ns() if not isinstance(_now, int) else _now 1152 if as_int: 1153 return now_ts 1154 return pd.to_datetime(now_ts, unit='ns', utc=True) 1155 1156 now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now 1157 delta = timedelta(**{true_precision_unit + 's': precision_interval}) 1158 rounded_now = round_time(now, delta, to=round_to) 1159 1160 if as_int: 1161 return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit]) 1162 1163 ts_val = ( 1164 pd.to_datetime(rounded_now, utc=True) 1165 if as_pandas 1166 else rounded_now 1167 ) 1168 1169 if not as_pandas: 1170 return ts_val 1171 1172 as_unit_precisions = ('microsecond', 'millisecond', 'second') 1173 if true_precision_unit not in as_unit_precisions: 1174 return ts_val 1175 1176 return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])
Return the current UTC timestamp to nanosecond precision.
Parameters
- precision_unit (str, default 'us'):
The precision of the timestamp to be returned.
Valid values are the following:
-
ns/nanosecond-us/microsecond-ms/millisecond-s/sec/second-m/min/minute-h/hr/hour-d/day - precision_interval (int, default 1):
Round the timestamp to the
precision_intervalunits. For example,precision='minute'andprecision_interval=15will round to 15-minute intervals. Note:precision_intervalmust be 1 whenprecision='nanosecond'. - round_to (str, default 'down'):
The direction to which to round the timestamp.
Available options are
down,up, andclosest. - as_pandas (bool, default False):
If
True, return a Pandas Timestamp. This is always true ifunitisnanosecond. - as_int (bool, default False):
If
True, return the timestamp to an integer. Overridesas_pandas.
Returns
- A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
Examples
>>> get_current_timestamp('ns')
Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
>>> get_current_timestamp('ms')
Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1179def is_dtype_special(type_: str) -> bool: 1180 """ 1181 Return whether a dtype should be treated as a special Meerschaum dtype. 1182 This is not the same as a Meerschaum alias. 1183 """ 1184 true_type = MRSM_ALIAS_DTYPES.get(type_, type_) 1185 if true_type in ( 1186 'uuid', 1187 'json', 1188 'bytes', 1189 'numeric', 1190 'datetime', 1191 'geometry', 1192 'geography', 1193 'date', 1194 'bool', 1195 ): 1196 return True 1197 1198 if are_dtypes_equal(true_type, 'datetime'): 1199 return True 1200 1201 if are_dtypes_equal(true_type, 'date'): 1202 return True 1203 1204 if true_type.startswith('numeric'): 1205 return True 1206 1207 if true_type.startswith('bool'): 1208 return True 1209 1210 if true_type.startswith('geometry'): 1211 return True 1212 1213 if true_type.startswith('geography'): 1214 return True 1215 1216 return False
Return whether a dtype should be treated as a special Meerschaum dtype. This is not the same as a Meerschaum alias.
1219def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str: 1220 """ 1221 Get the next precision string in order of value. 1222 1223 Parameters 1224 ---------- 1225 precision_unit: str 1226 The precision string (`'nanosecond'`, `'ms'`, etc.). 1227 1228 decrease: bool, defaul True 1229 If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`). 1230 If `False`, return the precision unit which is higher. 1231 1232 Returns 1233 ------- 1234 A `precision` string which is lower or higher than the given precision unit. 1235 1236 Examples 1237 -------- 1238 >>> get_next_precision_unit('nanosecond') 1239 'microsecond' 1240 >>> get_next_precision_unit('ms') 1241 'second' 1242 >>> get_next_precision_unit('hour', decrease=False) 1243 'minute' 1244 """ 1245 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1246 precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None) 1247 if not precision_scalar: 1248 raise ValueError(f"Invalid precision unit '{precision_unit}'.") 1249 1250 precisions = sorted( 1251 list(MRSM_PRECISION_UNITS_SCALARS), 1252 key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p] 1253 ) 1254 1255 precision_index = precisions.index(true_precision_unit) 1256 new_precision_index = precision_index + (-1 if decrease else 1) 1257 if new_precision_index < 0 or new_precision_index >= len(precisions): 1258 raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.") 1259 1260 return precisions[new_precision_index]
Get the next precision string in order of value.
Parameters
- precision_unit (str):
The precision string (
'nanosecond','ms', etc.). - decrease (bool, defaul True):
If
Truereturn the precision unit which is lower (e.g.nanosecond->millisecond). IfFalse, return the precision unit which is higher.
Returns
- A
precisionstring which is lower or higher than the given precision unit.
Examples
>>> get_next_precision_unit('nanosecond')
'microsecond'
>>> get_next_precision_unit('ms')
'second'
>>> get_next_precision_unit('hour', decrease=False)
'minute'
1263def round_time( 1264 dt: Optional[datetime] = None, 1265 date_delta: Optional[timedelta] = None, 1266 to: 'str' = 'down' 1267) -> datetime: 1268 """ 1269 Round a datetime object to a multiple of a timedelta. 1270 1271 Parameters 1272 ---------- 1273 dt: Optional[datetime], default None 1274 If `None`, grab the current UTC datetime. 1275 1276 date_delta: Optional[timedelta], default None 1277 If `None`, use a delta of 1 minute. 1278 1279 to: 'str', default 'down' 1280 Available options are `'up'`, `'down'`, and `'closest'`. 1281 1282 Returns 1283 ------- 1284 A rounded `datetime` object. 1285 1286 Examples 1287 -------- 1288 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 1289 datetime.datetime(2022, 1, 1, 12, 15) 1290 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 1291 datetime.datetime(2022, 1, 1, 12, 16) 1292 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 1293 datetime.datetime(2022, 1, 1, 12, 0) 1294 >>> round_time( 1295 ... datetime(2022, 1, 1, 12, 15, 57, 200), 1296 ... timedelta(hours=1), 1297 ... to = 'closest' 1298 ... ) 1299 datetime.datetime(2022, 1, 1, 12, 0) 1300 >>> round_time( 1301 ... datetime(2022, 1, 1, 12, 45, 57, 200), 1302 ... datetime.timedelta(hours=1), 1303 ... to = 'closest' 1304 ... ) 1305 datetime.datetime(2022, 1, 1, 13, 0) 1306 1307 """ 1308 from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP 1309 if date_delta is None: 1310 date_delta = timedelta(minutes=1) 1311 1312 if dt is None: 1313 dt = datetime.now(timezone.utc).replace(tzinfo=None) 1314 1315 def get_total_microseconds(td: timedelta) -> int: 1316 return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds 1317 1318 round_to_microseconds = get_total_microseconds(date_delta) 1319 if round_to_microseconds == 0: 1320 return dt 1321 1322 dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min 1323 dt_total_microseconds = get_total_microseconds(dt_delta_from_min) 1324 1325 dt_dec = Decimal(dt_total_microseconds) 1326 round_to_dec = Decimal(round_to_microseconds) 1327 1328 div = dt_dec / round_to_dec 1329 if to == 'down': 1330 num_intervals = div.to_integral_value(rounding=ROUND_DOWN) 1331 elif to == 'up': 1332 num_intervals = div.to_integral_value(rounding=ROUND_UP) 1333 else: 1334 num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP) 1335 1336 rounded_dt_total_microseconds = num_intervals * round_to_dec 1337 adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds 1338 1339 return dt + timedelta(microseconds=adjustment_microseconds)
Round a datetime object to a multiple of a timedelta.
Parameters
- dt (Optional[datetime], default None):
If
None, grab the current UTC datetime. - date_delta (Optional[timedelta], default None):
If
None, use a delta of 1 minute. - to ('str', default 'down'):
Available options are
'up','down', and'closest'.
Returns
- A rounded
datetimeobject.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 15, 57, 200),
... timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 45, 57, 200),
... datetime.timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)