meerschaum.utils.dtypes
Utility functions for working with data types.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with data types. 7""" 8 9import traceback 10import json 11import uuid 12import time 13import struct 14from datetime import timezone, datetime, date, timedelta 15from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP 16 17import meerschaum as mrsm 18from meerschaum.utils.typing import Dict, Union, Any, Optional, Tuple 19from meerschaum.utils.warnings import warn 20from meerschaum._internal.static import STATIC_CONFIG as _STATIC_CONFIG 21 22MRSM_ALIAS_DTYPES: Dict[str, str] = { 23 'decimal': 'numeric', 24 'Decimal': 'numeric', 25 'number': 'numeric', 26 'jsonl': 'json', 27 'JSON': 'json', 28 'binary': 'bytes', 29 'blob': 'bytes', 30 'varbinary': 'bytes', 31 'bytea': 'bytes', 32 'guid': 'uuid', 33 'UUID': 'uuid', 34 'geom': 'geometry', 35 'geog': 'geography', 36 'boolean': 'bool', 37 'day': 'date', 38} 39MRSM_PD_DTYPES: Dict[Union[str, None], str] = { 40 'json': 'object', 41 'numeric': 'object', 42 'geometry': 'object', 43 'geography': 'object', 44 'uuid': 'object', 45 'date': 'date32[day][pyarrow]', 46 'datetime': 'datetime64[us, UTC]', 47 'bool': 'bool[pyarrow]', 48 'int': 'int64[pyarrow]', 49 'int8': 'int8[pyarrow]', 50 'int16': 'int16[pyarrow]', 51 'int32': 'int32[pyarrow]', 52 'int64': 'int64[pyarrow]', 53 'str': 'string', 54 'bytes': 'binary[pyarrow]', 55 None: 'object', 56} 57 58MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = { 59 'nanosecond': 1_000_000_000, 60 'microsecond': 1_000_000, 61 'millisecond': 1000, 62 'second': 1, 63 'minute': (1 / 60), 64 'hour': (1 / 3600), 65 'day': (1 / 86400), 66} 67 68MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = { 69 'ns': 'nanosecond', 70 'us': 'microsecond', 71 'ms': 'millisecond', 72 's': 'second', 73 'sec': 'second', 74 'm': 'minute', 75 'min': 'minute', 76 'h': 'hour', 77 'hr': 'hour', 78 'd': 'day', 79 'D': 'day', 80} 81MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = { 82 'nanosecond': 'ns', 83 'microsecond': 'us', 84 'millisecond': 'ms', 85 'second': 's', 86 'minute': 'min', 87 'hour': 'hr', 88 'day': 'D', 89} 90 91 92def to_pandas_dtype(dtype: str) -> str: 93 """ 94 Cast a supported Meerschaum dtype to a Pandas dtype. 95 """ 96 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 97 if known_dtype is not None: 98 return known_dtype 99 100 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 101 if alias_dtype is not None: 102 return MRSM_PD_DTYPES[alias_dtype] 103 104 if dtype.startswith('numeric'): 105 return MRSM_PD_DTYPES['numeric'] 106 107 if dtype.startswith('geometry'): 108 return MRSM_PD_DTYPES['geometry'] 109 110 if dtype.startswith('geography'): 111 return MRSM_PD_DTYPES['geography'] 112 113 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 114 ### treat it as a SQL db type. 115 if dtype.split(' ')[0].isupper(): 116 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 117 return get_pd_type_from_db_type(dtype) 118 119 from meerschaum.utils.packages import attempt_import 120 _ = attempt_import('pyarrow', lazy=False) 121 pandas = attempt_import('pandas', lazy=False) 122 123 try: 124 return str(pandas.api.types.pandas_dtype(dtype)) 125 except Exception: 126 warn( 127 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 128 + f"{traceback.format_exc()}", 129 stack=False, 130 ) 131 return 'object' 132 133 134def are_dtypes_equal( 135 ldtype: Union[str, Dict[str, str]], 136 rdtype: Union[str, Dict[str, str]], 137) -> bool: 138 """ 139 Determine whether two dtype strings may be considered 140 equivalent to avoid unnecessary conversions. 141 142 Parameters 143 ---------- 144 ldtype: Union[str, Dict[str, str]] 145 The left dtype to compare. 146 May also provide a dtypes dictionary. 147 148 rdtype: Union[str, Dict[str, str]] 149 The right dtype to compare. 150 May also provide a dtypes dictionary. 151 152 Returns 153 ------- 154 A `bool` indicating whether the two dtypes are to be considered equivalent. 155 """ 156 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 157 lkeys = sorted([str(k) for k in ldtype.keys()]) 158 rkeys = sorted([str(k) for k in rdtype.keys()]) 159 for lkey, rkey in zip(lkeys, rkeys): 160 if lkey != rkey: 161 return False 162 ltype = ldtype[lkey] 163 rtype = rdtype[rkey] 164 if not are_dtypes_equal(ltype, rtype): 165 return False 166 return True 167 168 try: 169 if ldtype == rdtype: 170 return True 171 except Exception: 172 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 173 return False 174 175 ### Sometimes pandas dtype objects are passed. 176 ldtype = str(ldtype).split('[', maxsplit=1)[0] 177 rdtype = str(rdtype).split('[', maxsplit=1)[0] 178 179 if ldtype in MRSM_ALIAS_DTYPES: 180 ldtype = MRSM_ALIAS_DTYPES[ldtype] 181 182 if rdtype in MRSM_ALIAS_DTYPES: 183 rdtype = MRSM_ALIAS_DTYPES[rdtype] 184 185 json_dtypes = ('json', 'object') 186 if ldtype in json_dtypes and rdtype in json_dtypes: 187 return True 188 189 numeric_dtypes = ('numeric', 'decimal', 'object') 190 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 191 return True 192 193 uuid_dtypes = ('uuid', 'object') 194 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 195 return True 196 197 bytes_dtypes = ('bytes', 'object', 'binary') 198 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 199 return True 200 201 geometry_dtypes = ('geometry', 'object', 'geography') 202 if ldtype in geometry_dtypes and rdtype in geometry_dtypes: 203 return True 204 205 if ldtype.lower() == rdtype.lower(): 206 return True 207 208 datetime_dtypes = ('datetime', 'timestamp') 209 ldtype_found_dt_prefix = False 210 rdtype_found_dt_prefix = False 211 for dt_prefix in datetime_dtypes: 212 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 213 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 214 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 215 return True 216 217 string_dtypes = ('str', 'string', 'object') 218 if ldtype in string_dtypes and rdtype in string_dtypes: 219 return True 220 221 int_dtypes = ( 222 'int', 'int64', 'int32', 'int16', 'int8', 223 'uint', 'uint64', 'uint32', 'uint16', 'uint8', 224 ) 225 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 226 return True 227 228 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 229 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 230 return True 231 232 bool_dtypes = ('bool', 'boolean') 233 if ldtype in bool_dtypes and rdtype in bool_dtypes: 234 return True 235 236 date_dtypes = ( 237 'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]', 238 'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]', 239 ) 240 if ldtype in date_dtypes and rdtype in date_dtypes: 241 return True 242 243 return False 244 245 246def is_dtype_numeric(dtype: str) -> bool: 247 """ 248 Determine whether a given `dtype` string 249 should be considered compatible with the Meerschaum dtype `numeric`. 250 251 Parameters 252 ---------- 253 dtype: str 254 The pandas-like dtype string. 255 256 Returns 257 ------- 258 A bool indicating the dtype is compatible with `numeric`. 259 """ 260 dtype_lower = dtype.lower() 261 262 acceptable_substrings = ('numeric', 'float', 'double', 'int') 263 for substring in acceptable_substrings: 264 if substring in dtype_lower: 265 return True 266 267 return False 268 269 270def attempt_cast_to_numeric( 271 value: Any, 272 quantize: bool = False, 273 precision: Optional[int] = None, 274 scale: Optional[int] = None, 275)-> Any: 276 """ 277 Given a value, attempt to coerce it into a numeric (Decimal). 278 279 Parameters 280 ---------- 281 value: Any 282 The value to be cast to a Decimal. 283 284 quantize: bool, default False 285 If `True`, quantize the decimal to the specified precision and scale. 286 287 precision: Optional[int], default None 288 If `quantize` is `True`, use this precision. 289 290 scale: Optional[int], default None 291 If `quantize` is `True`, use this scale. 292 293 Returns 294 ------- 295 A `Decimal` if possible, or `value`. 296 """ 297 if isinstance(value, Decimal): 298 if quantize and precision and scale: 299 return quantize_decimal(value, precision, scale) 300 return value 301 try: 302 if value_is_null(value): 303 return Decimal('NaN') 304 305 dec = Decimal(str(value)) 306 if not quantize or not precision or not scale: 307 return dec 308 return quantize_decimal(dec, precision, scale) 309 except Exception: 310 return value 311 312 313def attempt_cast_to_uuid(value: Any) -> Any: 314 """ 315 Given a value, attempt to coerce it into a UUID (`uuid4`). 316 """ 317 if isinstance(value, uuid.UUID): 318 return value 319 try: 320 return ( 321 uuid.UUID(str(value)) 322 if not value_is_null(value) 323 else None 324 ) 325 except Exception: 326 return value 327 328 329def attempt_cast_to_bytes(value: Any) -> Any: 330 """ 331 Given a value, attempt to coerce it into a bytestring. 332 """ 333 if isinstance(value, bytes): 334 return value 335 try: 336 return ( 337 deserialize_bytes_string(str(value)) 338 if not value_is_null(value) 339 else None 340 ) 341 except Exception: 342 return value 343 344 345def attempt_cast_to_geometry(value: Any) -> Any: 346 """ 347 Given a value, attempt to coerce it into a `shapely` (`geometry`) object. 348 """ 349 shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import( 350 'shapely', 351 'shapely.wkt', 352 'shapely.wkb', 353 lazy=False, 354 ) 355 if 'shapely' in str(type(value)): 356 return value 357 358 if isinstance(value, (dict, list)): 359 try: 360 return shapely.from_geojson(json.dumps(value)) 361 except Exception: 362 return value 363 364 value_is_gpkg = geometry_is_gpkg(value) 365 if value_is_gpkg: 366 try: 367 wkb_data, _, _ = gpkg_wkb_to_wkb(value) 368 return shapely_wkb.loads(wkb_data) 369 except Exception: 370 return value 371 372 value_is_wkt = geometry_is_wkt(value) 373 if value_is_wkt is None: 374 return value 375 376 try: 377 return ( 378 shapely_wkt.loads(value) 379 if value_is_wkt 380 else shapely_wkb.loads(value) 381 ) 382 except Exception: 383 return value 384 385 386def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]: 387 """ 388 Determine whether an input value should be treated as WKT or WKB geometry data. 389 390 Parameters 391 ---------- 392 value: Union[str, bytes] 393 The input data to be parsed into geometry data. 394 395 Returns 396 ------- 397 A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB). 398 Return `None` if `value` should be parsed as neither. 399 """ 400 import re 401 if not isinstance(value, (str, bytes)): 402 return None 403 404 if isinstance(value, bytes): 405 return False 406 407 wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$' 408 if re.match(wkt_pattern, value, re.IGNORECASE): 409 return True 410 411 if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0: 412 return False 413 414 return None 415 416 417def geometry_is_gpkg(value: bytes) -> bool: 418 """ 419 Return whether the input `value` is formatted as GeoPackage WKB. 420 """ 421 if not isinstance(value, bytes) or len(value) < 2: 422 return False 423 424 return value[0:2] == b'GP' 425 426def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]: 427 """ 428 Converts GeoPackage WKB to standard WKB by removing the header. 429 430 Parameters 431 ---------- 432 gpkg_wkb_bytes: bytes 433 The GeoPackage WKB byte string. 434 435 Returns 436 ------- 437 A tuple containing the standard WKB bytes, SRID, and flags. 438 """ 439 magic_number = gpkg_wkb_bytes[0:2] 440 if magic_number != b'GP': 441 raise ValueError("Invalid GeoPackage WKB header: missing magic number.") 442 443 try: 444 header = gpkg_wkb_bytes[0:8] 445 header_vals = struct.unpack('<ccBBi', header) 446 flags = header_vals[-2] 447 srid = header_vals[-1] 448 except struct.error: 449 header = gpkg_wkb_bytes[0:6] 450 header_vals = struct.unpack('<ccBBh', header) 451 flags = header_vals[-2] 452 srid = header_vals[-1] 453 454 envelope_type = (flags >> 1) & 0x07 455 envelope_sizes = { 456 0: 0, 457 1: 32, 458 2: 48, 459 3: 48, 460 4: 64, 461 } 462 header_length = 8 + envelope_sizes.get(envelope_type, 0) 463 standard_wkb_bytes = gpkg_wkb_bytes[header_length:] 464 return standard_wkb_bytes, srid, flags 465 466 467def value_is_null(value: Any) -> bool: 468 """ 469 Determine if a value is a null-like string. 470 """ 471 return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>') 472 473 474def none_if_null(value: Any) -> Any: 475 """ 476 Return `None` if a value is a null-like string. 477 """ 478 return (None if value_is_null(value) else value) 479 480 481def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 482 """ 483 Quantize a given `Decimal` to a known scale and precision. 484 485 Parameters 486 ---------- 487 x: Decimal 488 The `Decimal` to be quantized. 489 490 precision: int 491 The total number of significant digits. 492 493 scale: int 494 The number of significant digits after the decimal point. 495 496 Returns 497 ------- 498 A `Decimal` quantized to the specified scale and precision. 499 """ 500 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 501 try: 502 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 503 except InvalidOperation: 504 pass 505 506 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.") 507 508 509def serialize_decimal( 510 x: Any, 511 quantize: bool = False, 512 precision: Optional[int] = None, 513 scale: Optional[int] = None, 514) -> Any: 515 """ 516 Return a quantized string of an input decimal. 517 518 Parameters 519 ---------- 520 x: Any 521 The potential decimal to be serialized. 522 523 quantize: bool, default False 524 If `True`, quantize the incoming Decimal to the specified scale and precision 525 before serialization. 526 527 precision: Optional[int], default None 528 The precision of the decimal to be quantized. 529 530 scale: Optional[int], default None 531 The scale of the decimal to be quantized. 532 533 Returns 534 ------- 535 A string of the input decimal or the input if not a Decimal. 536 """ 537 if not isinstance(x, Decimal): 538 return x 539 540 if value_is_null(x): 541 return None 542 543 if quantize and scale and precision: 544 x = quantize_decimal(x, precision, scale) 545 546 return f"{x:f}" 547 548 549def coerce_timezone( 550 dt: Any, 551 strip_utc: bool = False, 552) -> Any: 553 """ 554 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 555 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 556 """ 557 if dt is None: 558 return None 559 560 if isinstance(dt, int): 561 return dt 562 563 if isinstance(dt, str): 564 dateutil_parser = mrsm.attempt_import('dateutil.parser') 565 try: 566 dt = dateutil_parser.parse(dt) 567 except Exception: 568 return dt 569 570 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 571 if dt_is_series: 572 pandas = mrsm.attempt_import('pandas', lazy=False) 573 574 if ( 575 pandas.api.types.is_datetime64_any_dtype(dt) and ( 576 (dt.dt.tz is not None and not strip_utc) 577 or 578 (dt.dt.tz is None and strip_utc) 579 ) 580 ): 581 return dt 582 583 dt_series = to_datetime(dt, coerce_utc=False) 584 if dt_series.dt.tz is None: 585 dt_series = dt_series.dt.tz_localize(timezone.utc) 586 if strip_utc: 587 try: 588 if dt_series.dt.tz is not None: 589 dt_series = dt_series.dt.tz_localize(None) 590 except Exception: 591 pass 592 593 return dt_series 594 595 if dt.tzinfo is None: 596 if strip_utc: 597 return dt 598 return dt.replace(tzinfo=timezone.utc) 599 600 utc_dt = dt.astimezone(timezone.utc) 601 if strip_utc: 602 return utc_dt.replace(tzinfo=None) 603 return utc_dt 604 605 606def to_datetime( 607 dt_val: Any, 608 as_pydatetime: bool = False, 609 coerce_utc: bool = True, 610 precision_unit: Optional[str] = None, 611) -> Any: 612 """ 613 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 614 615 Parameters 616 ---------- 617 dt_val: Any 618 The value to coerce to Pandas Timestamps. 619 620 as_pydatetime: bool, default False 621 If `True`, return a Python datetime object. 622 623 coerce_utc: bool, default True 624 If `True`, ensure the value has UTC tzinfo. 625 626 precision_unit: Optional[str], default None 627 If provided, enforce the provided precision unit. 628 """ 629 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 630 is_dask = 'dask' in getattr(dt_val, '__module__', '') 631 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 632 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 633 pd = pandas if dd is None else dd 634 enforce_precision = precision_unit is not None 635 precision_unit = precision_unit or 'microsecond' 636 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 637 precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None) 638 if not precision_abbreviation: 639 raise ValueError(f"Invalid precision '{precision_unit}'.") 640 641 def parse(x: Any) -> Any: 642 try: 643 return dateutil_parser.parse(x) 644 except Exception: 645 return x 646 647 def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool: 648 dtype_check_against = ( 649 f"datetime64[{precision_abbreviation}, UTC]" 650 if with_utc 651 else f"datetime64[{precision_abbreviation}]" 652 ) 653 return ( 654 dtype_to_check == dtype_check_against 655 if enforce_precision 656 else ( 657 dtype_to_check.startswith('datetime64[') 658 and ( 659 ('utc' in dtype_to_check.lower()) 660 if with_utc 661 else ('utc' not in dtype_to_check.lower()) 662 ) 663 ) 664 ) 665 666 if isinstance(dt_val, pd.Timestamp): 667 dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime() 668 return ( 669 coerce_timezone(dt_val_to_return) 670 if coerce_utc 671 else dt_val_to_return 672 ) 673 674 if dt_is_series: 675 changed_tz = False 676 original_tz = None 677 dtype = str(getattr(dt_val, 'dtype', 'object')) 678 if ( 679 are_dtypes_equal(dtype, 'datetime') 680 and 'utc' not in dtype.lower() 681 and hasattr(dt_val, 'dt') 682 ): 683 original_tz = dt_val.dt.tz 684 dt_val = dt_val.dt.tz_localize(timezone.utc) 685 changed_tz = True 686 dtype = str(getattr(dt_val, 'dtype', 'object')) 687 try: 688 new_dt_series = ( 689 dt_val 690 if check_dtype(dtype, with_utc=True) 691 else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]") 692 ) 693 except pd.errors.OutOfBoundsDatetime: 694 try: 695 next_precision = get_next_precision_unit(true_precision_unit) 696 next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision] 697 new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]") 698 except Exception: 699 new_dt_series = None 700 except ValueError: 701 new_dt_series = None 702 except TypeError: 703 try: 704 new_dt_series = ( 705 new_dt_series 706 if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False) 707 else dt_val.astype(f"datetime64[{precision_abbreviation}]") 708 ) 709 except Exception: 710 new_dt_series = None 711 712 if new_dt_series is None: 713 new_dt_series = dt_val.apply(lambda x: parse(str(x))) 714 715 if coerce_utc: 716 return coerce_timezone(new_dt_series) 717 718 if changed_tz: 719 new_dt_series = new_dt_series.dt.tz_localize(original_tz) 720 return new_dt_series 721 722 try: 723 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 724 if new_dt_val.unit != precision_abbreviation: 725 new_dt_val = new_dt_val.as_unit(precision_abbreviation) 726 if as_pydatetime: 727 return new_dt_val.to_pydatetime() 728 return new_dt_val 729 except (pd.errors.OutOfBoundsDatetime, ValueError): 730 pass 731 732 new_dt_val = parse(dt_val) 733 if not coerce_utc: 734 return new_dt_val 735 return coerce_timezone(new_dt_val) 736 737 738def serialize_bytes(data: bytes) -> str: 739 """ 740 Return the given bytes as a base64-encoded string. 741 """ 742 import base64 743 if not isinstance(data, bytes) and value_is_null(data): 744 return data 745 return base64.b64encode(data).decode('utf-8') 746 747 748def serialize_geometry( 749 geom: Any, 750 geometry_format: str = 'wkb_hex', 751 srid: Optional[int] = None, 752) -> Union[str, Dict[str, Any], bytes, None]: 753 """ 754 Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 755 756 Parameters 757 ---------- 758 geom: Any 759 The potential geometry data to be serialized. 760 761 geometry_format: str, default 'wkb_hex' 762 The serialization format for geometry data. 763 Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`. 764 765 srid: Optional[int], default None 766 If provided, use this as the source CRS when serializing to GeoJSON. 767 768 Returns 769 ------- 770 A string containing the geometry data, or bytes, or a dictionary, or None. 771 """ 772 if value_is_null(geom): 773 return None 774 775 shapely, shapely_ops, pyproj, np = mrsm.attempt_import( 776 'shapely', 'shapely.ops', 'pyproj', 'numpy', 777 lazy=False, 778 ) 779 if geometry_format == 'geojson': 780 if srid: 781 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True) 782 geom = shapely_ops.transform(transformer.transform, geom) 783 geojson_str = shapely.to_geojson(geom) 784 return json.loads(geojson_str) 785 786 if not hasattr(geom, 'wkb_hex'): 787 return str(geom) 788 789 byte_order = 1 if np.little_endian else 0 790 791 if geometry_format.startswith("wkb"): 792 return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True) 793 elif geometry_format == 'gpkg_wkb': 794 wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order) 795 flags = ( 796 ((byte_order & 0x01) | (0x20)) 797 if geom.is_empty 798 else (byte_order & 0x01) 799 ) 800 srid_val = srid or -1 801 header = struct.pack( 802 '<ccBBi', 803 b'G', b'P', 804 0, 805 flags, 806 srid_val 807 ) 808 return header + wkb_data 809 810 return shapely.to_wkt(geom) 811 812 813def deserialize_geometry(geom_wkb: Union[str, bytes]): 814 """ 815 Deserialize a WKB string into a shapely geometry object. 816 """ 817 shapely = mrsm.attempt_import('shapely', lazy=False) 818 return shapely.wkb.loads(geom_wkb) 819 820 821def project_geometry(geom, srid: int, to_srid: int = 4326): 822 """ 823 Project a shapely geometry object to a new CRS (SRID). 824 """ 825 pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False) 826 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True) 827 return shapely_ops.transform(transformer.transform, geom) 828 829 830def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]: 831 """ 832 Given a serialized ASCII string of bytes data, return the original bytes. 833 The input data may either be base64- or hex-encoded. 834 835 Parameters 836 ---------- 837 data: Optional[str] 838 The string to be deserialized into bytes. 839 May be base64- or hex-encoded (prefixed with `'\\x'`). 840 841 force_hex: bool = False 842 If `True`, treat the input string as hex-encoded. 843 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 844 This will still strip the leading `'\\x'` prefix if present. 845 846 Returns 847 ------- 848 The original bytes used to produce the encoded string `data`. 849 """ 850 if not isinstance(data, str) and value_is_null(data): 851 return data 852 853 import binascii 854 import base64 855 856 is_hex = force_hex or data.startswith('\\x') 857 858 if is_hex: 859 if data.startswith('\\x'): 860 data = data[2:] 861 return binascii.unhexlify(data) 862 863 return base64.b64decode(data) 864 865 866def deserialize_base64(data: str) -> bytes: 867 """ 868 Return the original bytestring from the given base64-encoded string. 869 """ 870 import base64 871 return base64.b64decode(data) 872 873 874def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]: 875 """ 876 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 877 """ 878 import binascii 879 if not isinstance(data, bytes) and value_is_null(data): 880 return data 881 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8') 882 883 884def serialize_datetime(dt: datetime) -> Union[str, None]: 885 """ 886 Serialize a datetime object into JSON (ISO format string). 887 888 Examples 889 -------- 890 >>> import json 891 >>> from datetime import datetime 892 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 893 '{"a": "2022-01-01T00:00:00Z"}' 894 895 """ 896 if not hasattr(dt, 'isoformat'): 897 return None 898 899 tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else '' 900 return dt.isoformat() + tz_suffix 901 902 903def serialize_date(d: date) -> Union[str, None]: 904 """ 905 Serialize a date object into its ISO representation. 906 """ 907 return d.isoformat() if hasattr(d, 'isoformat') else None 908 909 910def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]: 911 """ 912 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 913 914 Parameters 915 ---------- 916 x: Any 917 The value to serialize. 918 919 default_to_str: bool, default True 920 If `True`, return a string of `x` if x is not a designated type. 921 Otherwise return x. 922 923 Returns 924 ------- 925 A serialized version of x, or x. 926 """ 927 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 928 return x.meta 929 930 if hasattr(x, 'tzinfo'): 931 return serialize_datetime(x) 932 933 if hasattr(x, 'isoformat'): 934 return serialize_date(x) 935 936 if isinstance(x, bytes): 937 return serialize_bytes(x) 938 939 if isinstance(x, Decimal): 940 return serialize_decimal(x) 941 942 if 'shapely' in str(type(x)): 943 return serialize_geometry(x) 944 945 if value_is_null(x): 946 return None 947 948 if isinstance(x, (dict, list, tuple)): 949 return json.dumps(x, default=json_serialize_value, separators=(',', ':')) 950 951 return str(x) if default_to_str else x 952 953 954def get_geometry_type_srid( 955 dtype: str = 'geometry', 956 default_type: str = 'geometry', 957 default_srid: int = 4326, 958) -> Union[Tuple[str, int], Tuple[str, None]]: 959 """ 960 Given the specified geometry `dtype`, return a tuple in the form (type, SRID). 961 962 Parameters 963 ---------- 964 dtype: Optional[str], default None 965 Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`). 966 You may specify a supported `shapely` geometry type and an SRID in the dtype modifier: 967 968 - `Point` 969 - `LineString` 970 - `LinearRing` 971 - `Polygon` 972 - `MultiPoint` 973 - `MultiLineString` 974 - `MultiPolygon` 975 - `GeometryCollection` 976 977 Returns 978 ------- 979 A tuple in the form (type, SRID). 980 Defaults to `(default_type, default_srid)`. 981 982 Examples 983 -------- 984 >>> from meerschaum.utils.dtypes import get_geometry_type_srid 985 >>> get_geometry_type_srid() 986 ('geometry', 4326) 987 >>> get_geometry_type_srid('geometry[]') 988 ('geometry', 4326) 989 >>> get_geometry_type_srid('geometry[Point, 0]') 990 ('Point', 0) 991 >>> get_geometry_type_srid('geometry[0, Point]') 992 ('Point', 0) 993 >>> get_geometry_type_srid('geometry[0]') 994 ('geometry', 0) 995 >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]') 996 ('MultiLineString', 4326) 997 >>> get_geometry_type_srid('geography') 998 ('geometry', 4326) 999 >>> get_geometry_type_srid('geography[POINT]') 1000 ('Point', 4376) 1001 """ 1002 from meerschaum.utils.misc import is_int 1003 ### NOTE: PostGIS syntax must also be parsed. 1004 dtype = dtype.replace('(', '[').replace(')', ']') 1005 bare_dtype = dtype.split('[', maxsplit=1)[0] 1006 modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']') 1007 if not modifier: 1008 return default_type, default_srid 1009 1010 parts = [ 1011 part.split('=')[-1].strip() 1012 for part in modifier.split(',') 1013 ] 1014 parts_casted = [ 1015 ( 1016 int(part) 1017 if is_int(part) 1018 else part 1019 ) 1020 for part in parts 1021 ] 1022 1023 srid = default_srid 1024 geometry_type = default_type 1025 1026 for part in parts_casted: 1027 if isinstance(part, int): 1028 srid = part 1029 break 1030 1031 for part in parts_casted: 1032 if isinstance(part, str): 1033 geometry_type = part 1034 break 1035 1036 return geometry_type, srid 1037 1038 1039def get_current_timestamp( 1040 precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'], 1041 precision_interval: int = 1, 1042 round_to: str = 'down', 1043 as_pandas: bool = False, 1044 as_int: bool = False, 1045 _now: Union[datetime, int, None] = None, 1046) -> 'Union[datetime, pd.Timestamp, int]': 1047 """ 1048 Return the current UTC timestamp to nanosecond precision. 1049 1050 Parameters 1051 ---------- 1052 precision_unit: str, default 'us' 1053 The precision of the timestamp to be returned. 1054 Valid values are the following: 1055 - `ns` / `nanosecond` 1056 - `us` / `microsecond` 1057 - `ms` / `millisecond` 1058 - `s` / `sec` / `second` 1059 - `m` / `min` / `minute` 1060 - `h` / `hr` / `hour` 1061 - `d` / `day` 1062 1063 precision_interval: int, default 1 1064 Round the timestamp to the `precision_interval` units. 1065 For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals. 1066 Note: `precision_interval` must be 1 when `precision='nanosecond'`. 1067 1068 round_to: str, default 'down' 1069 The direction to which to round the timestamp. 1070 Available options are `down`, `up`, and `closest`. 1071 1072 as_pandas: bool, default False 1073 If `True`, return a Pandas Timestamp. 1074 This is always true if `unit` is `nanosecond`. 1075 1076 as_int: bool, default False 1077 If `True`, return the timestamp to an integer. 1078 Overrides `as_pandas`. 1079 1080 Returns 1081 ------- 1082 A Pandas Timestamp, datetime object, or integer with precision to the provided unit. 1083 1084 Examples 1085 -------- 1086 >>> get_current_timestamp('ns') 1087 Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC') 1088 >>> get_current_timestamp('ms') 1089 Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC') 1090 """ 1091 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1092 if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS: 1093 from meerschaum.utils.misc import items_str 1094 raise ValueError( 1095 f"Unknown precision unit '{precision_unit}'. " 1096 "Accepted values are " 1097 f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}." 1098 ) 1099 1100 if not as_int: 1101 as_pandas = as_pandas or true_precision_unit == 'nanosecond' 1102 pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None 1103 1104 if true_precision_unit == 'nanosecond': 1105 if precision_interval != 1: 1106 warn("`precision_interval` must be 1 for nanosecond precision.") 1107 now_ts = time.time_ns() if not isinstance(_now, int) else _now 1108 if as_int: 1109 return now_ts 1110 return pd.to_datetime(now_ts, unit='ns', utc=True) 1111 1112 now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now 1113 delta = timedelta(**{true_precision_unit + 's': precision_interval}) 1114 rounded_now = round_time(now, delta, to=round_to) 1115 1116 if as_int: 1117 return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit]) 1118 1119 ts_val = ( 1120 pd.to_datetime(rounded_now, utc=True) 1121 if as_pandas 1122 else rounded_now 1123 ) 1124 1125 if not as_pandas: 1126 return ts_val 1127 1128 as_unit_precisions = ('microsecond', 'millisecond', 'second') 1129 if true_precision_unit not in as_unit_precisions: 1130 return ts_val 1131 1132 return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit]) 1133 1134 1135def is_dtype_special(type_: str) -> bool: 1136 """ 1137 Return whether a dtype should be treated as a special Meerschaum dtype. 1138 This is not the same as a Meerschaum alias. 1139 """ 1140 true_type = MRSM_ALIAS_DTYPES.get(type_, type_) 1141 if true_type in ( 1142 'uuid', 1143 'json', 1144 'bytes', 1145 'numeric', 1146 'datetime', 1147 'geometry', 1148 'geography', 1149 'date', 1150 'bool', 1151 ): 1152 return True 1153 1154 if are_dtypes_equal(true_type, 'datetime'): 1155 return True 1156 1157 if are_dtypes_equal(true_type, 'date'): 1158 return True 1159 1160 if true_type.startswith('numeric'): 1161 return True 1162 1163 if true_type.startswith('bool'): 1164 return True 1165 1166 if true_type.startswith('geometry'): 1167 return True 1168 1169 if true_type.startswith('geography'): 1170 return True 1171 1172 return False 1173 1174 1175def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str: 1176 """ 1177 Get the next precision string in order of value. 1178 1179 Parameters 1180 ---------- 1181 precision_unit: str 1182 The precision string (`'nanosecond'`, `'ms'`, etc.). 1183 1184 decrease: bool, defaul True 1185 If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`). 1186 If `False`, return the precision unit which is higher. 1187 1188 Returns 1189 ------- 1190 A `precision` string which is lower or higher than the given precision unit. 1191 1192 Examples 1193 -------- 1194 >>> get_next_precision_unit('nanosecond') 1195 'microsecond' 1196 >>> get_next_precision_unit('ms') 1197 'second' 1198 >>> get_next_precision_unit('hour', decrease=False) 1199 'minute' 1200 """ 1201 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1202 precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None) 1203 if not precision_scalar: 1204 raise ValueError(f"Invalid precision unit '{precision_unit}'.") 1205 1206 precisions = sorted( 1207 list(MRSM_PRECISION_UNITS_SCALARS), 1208 key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p] 1209 ) 1210 1211 precision_index = precisions.index(true_precision_unit) 1212 new_precision_index = precision_index + (-1 if decrease else 1) 1213 if new_precision_index < 0 or new_precision_index >= len(precisions): 1214 raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.") 1215 1216 return precisions[new_precision_index] 1217 1218 1219def round_time( 1220 dt: Optional[datetime] = None, 1221 date_delta: Optional[timedelta] = None, 1222 to: 'str' = 'down' 1223) -> datetime: 1224 """ 1225 Round a datetime object to a multiple of a timedelta. 1226 1227 Parameters 1228 ---------- 1229 dt: Optional[datetime], default None 1230 If `None`, grab the current UTC datetime. 1231 1232 date_delta: Optional[timedelta], default None 1233 If `None`, use a delta of 1 minute. 1234 1235 to: 'str', default 'down' 1236 Available options are `'up'`, `'down'`, and `'closest'`. 1237 1238 Returns 1239 ------- 1240 A rounded `datetime` object. 1241 1242 Examples 1243 -------- 1244 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 1245 datetime.datetime(2022, 1, 1, 12, 15) 1246 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 1247 datetime.datetime(2022, 1, 1, 12, 16) 1248 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 1249 datetime.datetime(2022, 1, 1, 12, 0) 1250 >>> round_time( 1251 ... datetime(2022, 1, 1, 12, 15, 57, 200), 1252 ... timedelta(hours=1), 1253 ... to = 'closest' 1254 ... ) 1255 datetime.datetime(2022, 1, 1, 12, 0) 1256 >>> round_time( 1257 ... datetime(2022, 1, 1, 12, 45, 57, 200), 1258 ... datetime.timedelta(hours=1), 1259 ... to = 'closest' 1260 ... ) 1261 datetime.datetime(2022, 1, 1, 13, 0) 1262 1263 """ 1264 from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP 1265 if date_delta is None: 1266 date_delta = timedelta(minutes=1) 1267 1268 if dt is None: 1269 dt = datetime.now(timezone.utc).replace(tzinfo=None) 1270 1271 def get_total_microseconds(td: timedelta) -> int: 1272 return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds 1273 1274 round_to_microseconds = get_total_microseconds(date_delta) 1275 if round_to_microseconds == 0: 1276 return dt 1277 1278 dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min 1279 dt_total_microseconds = get_total_microseconds(dt_delta_from_min) 1280 1281 dt_dec = Decimal(dt_total_microseconds) 1282 round_to_dec = Decimal(round_to_microseconds) 1283 1284 div = dt_dec / round_to_dec 1285 if to == 'down': 1286 num_intervals = div.to_integral_value(rounding=ROUND_DOWN) 1287 elif to == 'up': 1288 num_intervals = div.to_integral_value(rounding=ROUND_UP) 1289 else: 1290 num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP) 1291 1292 rounded_dt_total_microseconds = num_intervals * round_to_dec 1293 adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds 1294 1295 return dt + timedelta(microseconds=adjustment_microseconds)
93def to_pandas_dtype(dtype: str) -> str: 94 """ 95 Cast a supported Meerschaum dtype to a Pandas dtype. 96 """ 97 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 98 if known_dtype is not None: 99 return known_dtype 100 101 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 102 if alias_dtype is not None: 103 return MRSM_PD_DTYPES[alias_dtype] 104 105 if dtype.startswith('numeric'): 106 return MRSM_PD_DTYPES['numeric'] 107 108 if dtype.startswith('geometry'): 109 return MRSM_PD_DTYPES['geometry'] 110 111 if dtype.startswith('geography'): 112 return MRSM_PD_DTYPES['geography'] 113 114 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 115 ### treat it as a SQL db type. 116 if dtype.split(' ')[0].isupper(): 117 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 118 return get_pd_type_from_db_type(dtype) 119 120 from meerschaum.utils.packages import attempt_import 121 _ = attempt_import('pyarrow', lazy=False) 122 pandas = attempt_import('pandas', lazy=False) 123 124 try: 125 return str(pandas.api.types.pandas_dtype(dtype)) 126 except Exception: 127 warn( 128 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 129 + f"{traceback.format_exc()}", 130 stack=False, 131 ) 132 return 'object'
Cast a supported Meerschaum dtype to a Pandas dtype.
135def are_dtypes_equal( 136 ldtype: Union[str, Dict[str, str]], 137 rdtype: Union[str, Dict[str, str]], 138) -> bool: 139 """ 140 Determine whether two dtype strings may be considered 141 equivalent to avoid unnecessary conversions. 142 143 Parameters 144 ---------- 145 ldtype: Union[str, Dict[str, str]] 146 The left dtype to compare. 147 May also provide a dtypes dictionary. 148 149 rdtype: Union[str, Dict[str, str]] 150 The right dtype to compare. 151 May also provide a dtypes dictionary. 152 153 Returns 154 ------- 155 A `bool` indicating whether the two dtypes are to be considered equivalent. 156 """ 157 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 158 lkeys = sorted([str(k) for k in ldtype.keys()]) 159 rkeys = sorted([str(k) for k in rdtype.keys()]) 160 for lkey, rkey in zip(lkeys, rkeys): 161 if lkey != rkey: 162 return False 163 ltype = ldtype[lkey] 164 rtype = rdtype[rkey] 165 if not are_dtypes_equal(ltype, rtype): 166 return False 167 return True 168 169 try: 170 if ldtype == rdtype: 171 return True 172 except Exception: 173 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 174 return False 175 176 ### Sometimes pandas dtype objects are passed. 177 ldtype = str(ldtype).split('[', maxsplit=1)[0] 178 rdtype = str(rdtype).split('[', maxsplit=1)[0] 179 180 if ldtype in MRSM_ALIAS_DTYPES: 181 ldtype = MRSM_ALIAS_DTYPES[ldtype] 182 183 if rdtype in MRSM_ALIAS_DTYPES: 184 rdtype = MRSM_ALIAS_DTYPES[rdtype] 185 186 json_dtypes = ('json', 'object') 187 if ldtype in json_dtypes and rdtype in json_dtypes: 188 return True 189 190 numeric_dtypes = ('numeric', 'decimal', 'object') 191 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 192 return True 193 194 uuid_dtypes = ('uuid', 'object') 195 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 196 return True 197 198 bytes_dtypes = ('bytes', 'object', 'binary') 199 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 200 return True 201 202 geometry_dtypes = ('geometry', 'object', 'geography') 203 if ldtype in geometry_dtypes and rdtype in geometry_dtypes: 204 return True 205 206 if ldtype.lower() == rdtype.lower(): 207 return True 208 209 datetime_dtypes = ('datetime', 'timestamp') 210 ldtype_found_dt_prefix = False 211 rdtype_found_dt_prefix = False 212 for dt_prefix in datetime_dtypes: 213 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 214 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 215 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 216 return True 217 218 string_dtypes = ('str', 'string', 'object') 219 if ldtype in string_dtypes and rdtype in string_dtypes: 220 return True 221 222 int_dtypes = ( 223 'int', 'int64', 'int32', 'int16', 'int8', 224 'uint', 'uint64', 'uint32', 'uint16', 'uint8', 225 ) 226 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 227 return True 228 229 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 230 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 231 return True 232 233 bool_dtypes = ('bool', 'boolean') 234 if ldtype in bool_dtypes and rdtype in bool_dtypes: 235 return True 236 237 date_dtypes = ( 238 'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]', 239 'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]', 240 ) 241 if ldtype in date_dtypes and rdtype in date_dtypes: 242 return True 243 244 return False
Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.
Parameters
- ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
- rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
- A
bool
indicating whether the two dtypes are to be considered equivalent.
247def is_dtype_numeric(dtype: str) -> bool: 248 """ 249 Determine whether a given `dtype` string 250 should be considered compatible with the Meerschaum dtype `numeric`. 251 252 Parameters 253 ---------- 254 dtype: str 255 The pandas-like dtype string. 256 257 Returns 258 ------- 259 A bool indicating the dtype is compatible with `numeric`. 260 """ 261 dtype_lower = dtype.lower() 262 263 acceptable_substrings = ('numeric', 'float', 'double', 'int') 264 for substring in acceptable_substrings: 265 if substring in dtype_lower: 266 return True 267 268 return False
Determine whether a given dtype
string
should be considered compatible with the Meerschaum dtype numeric
.
Parameters
- dtype (str): The pandas-like dtype string.
Returns
- A bool indicating the dtype is compatible with
numeric
.
271def attempt_cast_to_numeric( 272 value: Any, 273 quantize: bool = False, 274 precision: Optional[int] = None, 275 scale: Optional[int] = None, 276)-> Any: 277 """ 278 Given a value, attempt to coerce it into a numeric (Decimal). 279 280 Parameters 281 ---------- 282 value: Any 283 The value to be cast to a Decimal. 284 285 quantize: bool, default False 286 If `True`, quantize the decimal to the specified precision and scale. 287 288 precision: Optional[int], default None 289 If `quantize` is `True`, use this precision. 290 291 scale: Optional[int], default None 292 If `quantize` is `True`, use this scale. 293 294 Returns 295 ------- 296 A `Decimal` if possible, or `value`. 297 """ 298 if isinstance(value, Decimal): 299 if quantize and precision and scale: 300 return quantize_decimal(value, precision, scale) 301 return value 302 try: 303 if value_is_null(value): 304 return Decimal('NaN') 305 306 dec = Decimal(str(value)) 307 if not quantize or not precision or not scale: 308 return dec 309 return quantize_decimal(dec, precision, scale) 310 except Exception: 311 return value
Given a value, attempt to coerce it into a numeric (Decimal).
Parameters
- value (Any): The value to be cast to a Decimal.
- quantize (bool, default False):
If
True
, quantize the decimal to the specified precision and scale. - precision (Optional[int], default None):
If
quantize
isTrue
, use this precision. - scale (Optional[int], default None):
If
quantize
isTrue
, use this scale.
Returns
- A
Decimal
if possible, orvalue
.
314def attempt_cast_to_uuid(value: Any) -> Any: 315 """ 316 Given a value, attempt to coerce it into a UUID (`uuid4`). 317 """ 318 if isinstance(value, uuid.UUID): 319 return value 320 try: 321 return ( 322 uuid.UUID(str(value)) 323 if not value_is_null(value) 324 else None 325 ) 326 except Exception: 327 return value
Given a value, attempt to coerce it into a UUID (uuid4
).
330def attempt_cast_to_bytes(value: Any) -> Any: 331 """ 332 Given a value, attempt to coerce it into a bytestring. 333 """ 334 if isinstance(value, bytes): 335 return value 336 try: 337 return ( 338 deserialize_bytes_string(str(value)) 339 if not value_is_null(value) 340 else None 341 ) 342 except Exception: 343 return value
Given a value, attempt to coerce it into a bytestring.
346def attempt_cast_to_geometry(value: Any) -> Any: 347 """ 348 Given a value, attempt to coerce it into a `shapely` (`geometry`) object. 349 """ 350 shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import( 351 'shapely', 352 'shapely.wkt', 353 'shapely.wkb', 354 lazy=False, 355 ) 356 if 'shapely' in str(type(value)): 357 return value 358 359 if isinstance(value, (dict, list)): 360 try: 361 return shapely.from_geojson(json.dumps(value)) 362 except Exception: 363 return value 364 365 value_is_gpkg = geometry_is_gpkg(value) 366 if value_is_gpkg: 367 try: 368 wkb_data, _, _ = gpkg_wkb_to_wkb(value) 369 return shapely_wkb.loads(wkb_data) 370 except Exception: 371 return value 372 373 value_is_wkt = geometry_is_wkt(value) 374 if value_is_wkt is None: 375 return value 376 377 try: 378 return ( 379 shapely_wkt.loads(value) 380 if value_is_wkt 381 else shapely_wkb.loads(value) 382 ) 383 except Exception: 384 return value
Given a value, attempt to coerce it into a shapely
(geometry
) object.
387def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]: 388 """ 389 Determine whether an input value should be treated as WKT or WKB geometry data. 390 391 Parameters 392 ---------- 393 value: Union[str, bytes] 394 The input data to be parsed into geometry data. 395 396 Returns 397 ------- 398 A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB). 399 Return `None` if `value` should be parsed as neither. 400 """ 401 import re 402 if not isinstance(value, (str, bytes)): 403 return None 404 405 if isinstance(value, bytes): 406 return False 407 408 wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$' 409 if re.match(wkt_pattern, value, re.IGNORECASE): 410 return True 411 412 if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0: 413 return False 414 415 return None
Determine whether an input value should be treated as WKT or WKB geometry data.
Parameters
- value (Union[str, bytes]): The input data to be parsed into geometry data.
Returns
- A
bool
(True
ifvalue
is WKT andFalse
if it should be treated as WKB). - Return
None
ifvalue
should be parsed as neither.
418def geometry_is_gpkg(value: bytes) -> bool: 419 """ 420 Return whether the input `value` is formatted as GeoPackage WKB. 421 """ 422 if not isinstance(value, bytes) or len(value) < 2: 423 return False 424 425 return value[0:2] == b'GP'
Return whether the input value
is formatted as GeoPackage WKB.
427def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]: 428 """ 429 Converts GeoPackage WKB to standard WKB by removing the header. 430 431 Parameters 432 ---------- 433 gpkg_wkb_bytes: bytes 434 The GeoPackage WKB byte string. 435 436 Returns 437 ------- 438 A tuple containing the standard WKB bytes, SRID, and flags. 439 """ 440 magic_number = gpkg_wkb_bytes[0:2] 441 if magic_number != b'GP': 442 raise ValueError("Invalid GeoPackage WKB header: missing magic number.") 443 444 try: 445 header = gpkg_wkb_bytes[0:8] 446 header_vals = struct.unpack('<ccBBi', header) 447 flags = header_vals[-2] 448 srid = header_vals[-1] 449 except struct.error: 450 header = gpkg_wkb_bytes[0:6] 451 header_vals = struct.unpack('<ccBBh', header) 452 flags = header_vals[-2] 453 srid = header_vals[-1] 454 455 envelope_type = (flags >> 1) & 0x07 456 envelope_sizes = { 457 0: 0, 458 1: 32, 459 2: 48, 460 3: 48, 461 4: 64, 462 } 463 header_length = 8 + envelope_sizes.get(envelope_type, 0) 464 standard_wkb_bytes = gpkg_wkb_bytes[header_length:] 465 return standard_wkb_bytes, srid, flags
Converts GeoPackage WKB to standard WKB by removing the header.
Parameters
- gpkg_wkb_bytes (bytes): The GeoPackage WKB byte string.
Returns
- A tuple containing the standard WKB bytes, SRID, and flags.
468def value_is_null(value: Any) -> bool: 469 """ 470 Determine if a value is a null-like string. 471 """ 472 return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')
Determine if a value is a null-like string.
475def none_if_null(value: Any) -> Any: 476 """ 477 Return `None` if a value is a null-like string. 478 """ 479 return (None if value_is_null(value) else value)
Return None
if a value is a null-like string.
482def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 483 """ 484 Quantize a given `Decimal` to a known scale and precision. 485 486 Parameters 487 ---------- 488 x: Decimal 489 The `Decimal` to be quantized. 490 491 precision: int 492 The total number of significant digits. 493 494 scale: int 495 The number of significant digits after the decimal point. 496 497 Returns 498 ------- 499 A `Decimal` quantized to the specified scale and precision. 500 """ 501 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 502 try: 503 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 504 except InvalidOperation: 505 pass 506 507 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
Quantize a given Decimal
to a known scale and precision.
Parameters
- x (Decimal):
The
Decimal
to be quantized. - precision (int): The total number of significant digits.
- scale (int): The number of significant digits after the decimal point.
Returns
- A
Decimal
quantized to the specified scale and precision.
510def serialize_decimal( 511 x: Any, 512 quantize: bool = False, 513 precision: Optional[int] = None, 514 scale: Optional[int] = None, 515) -> Any: 516 """ 517 Return a quantized string of an input decimal. 518 519 Parameters 520 ---------- 521 x: Any 522 The potential decimal to be serialized. 523 524 quantize: bool, default False 525 If `True`, quantize the incoming Decimal to the specified scale and precision 526 before serialization. 527 528 precision: Optional[int], default None 529 The precision of the decimal to be quantized. 530 531 scale: Optional[int], default None 532 The scale of the decimal to be quantized. 533 534 Returns 535 ------- 536 A string of the input decimal or the input if not a Decimal. 537 """ 538 if not isinstance(x, Decimal): 539 return x 540 541 if value_is_null(x): 542 return None 543 544 if quantize and scale and precision: 545 x = quantize_decimal(x, precision, scale) 546 547 return f"{x:f}"
Return a quantized string of an input decimal.
Parameters
- x (Any): The potential decimal to be serialized.
- quantize (bool, default False):
If
True
, quantize the incoming Decimal to the specified scale and precision before serialization. - precision (Optional[int], default None): The precision of the decimal to be quantized.
- scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
- A string of the input decimal or the input if not a Decimal.
550def coerce_timezone( 551 dt: Any, 552 strip_utc: bool = False, 553) -> Any: 554 """ 555 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 556 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 557 """ 558 if dt is None: 559 return None 560 561 if isinstance(dt, int): 562 return dt 563 564 if isinstance(dt, str): 565 dateutil_parser = mrsm.attempt_import('dateutil.parser') 566 try: 567 dt = dateutil_parser.parse(dt) 568 except Exception: 569 return dt 570 571 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 572 if dt_is_series: 573 pandas = mrsm.attempt_import('pandas', lazy=False) 574 575 if ( 576 pandas.api.types.is_datetime64_any_dtype(dt) and ( 577 (dt.dt.tz is not None and not strip_utc) 578 or 579 (dt.dt.tz is None and strip_utc) 580 ) 581 ): 582 return dt 583 584 dt_series = to_datetime(dt, coerce_utc=False) 585 if dt_series.dt.tz is None: 586 dt_series = dt_series.dt.tz_localize(timezone.utc) 587 if strip_utc: 588 try: 589 if dt_series.dt.tz is not None: 590 dt_series = dt_series.dt.tz_localize(None) 591 except Exception: 592 pass 593 594 return dt_series 595 596 if dt.tzinfo is None: 597 if strip_utc: 598 return dt 599 return dt.replace(tzinfo=timezone.utc) 600 601 utc_dt = dt.astimezone(timezone.utc) 602 if strip_utc: 603 return utc_dt.replace(tzinfo=None) 604 return utc_dt
Given a datetime
, pandas Timestamp
or Series
of Timestamp
,
return a UTC timestamp (strip timezone if strip_utc
is True
.
607def to_datetime( 608 dt_val: Any, 609 as_pydatetime: bool = False, 610 coerce_utc: bool = True, 611 precision_unit: Optional[str] = None, 612) -> Any: 613 """ 614 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 615 616 Parameters 617 ---------- 618 dt_val: Any 619 The value to coerce to Pandas Timestamps. 620 621 as_pydatetime: bool, default False 622 If `True`, return a Python datetime object. 623 624 coerce_utc: bool, default True 625 If `True`, ensure the value has UTC tzinfo. 626 627 precision_unit: Optional[str], default None 628 If provided, enforce the provided precision unit. 629 """ 630 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 631 is_dask = 'dask' in getattr(dt_val, '__module__', '') 632 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 633 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 634 pd = pandas if dd is None else dd 635 enforce_precision = precision_unit is not None 636 precision_unit = precision_unit or 'microsecond' 637 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 638 precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None) 639 if not precision_abbreviation: 640 raise ValueError(f"Invalid precision '{precision_unit}'.") 641 642 def parse(x: Any) -> Any: 643 try: 644 return dateutil_parser.parse(x) 645 except Exception: 646 return x 647 648 def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool: 649 dtype_check_against = ( 650 f"datetime64[{precision_abbreviation}, UTC]" 651 if with_utc 652 else f"datetime64[{precision_abbreviation}]" 653 ) 654 return ( 655 dtype_to_check == dtype_check_against 656 if enforce_precision 657 else ( 658 dtype_to_check.startswith('datetime64[') 659 and ( 660 ('utc' in dtype_to_check.lower()) 661 if with_utc 662 else ('utc' not in dtype_to_check.lower()) 663 ) 664 ) 665 ) 666 667 if isinstance(dt_val, pd.Timestamp): 668 dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime() 669 return ( 670 coerce_timezone(dt_val_to_return) 671 if coerce_utc 672 else dt_val_to_return 673 ) 674 675 if dt_is_series: 676 changed_tz = False 677 original_tz = None 678 dtype = str(getattr(dt_val, 'dtype', 'object')) 679 if ( 680 are_dtypes_equal(dtype, 'datetime') 681 and 'utc' not in dtype.lower() 682 and hasattr(dt_val, 'dt') 683 ): 684 original_tz = dt_val.dt.tz 685 dt_val = dt_val.dt.tz_localize(timezone.utc) 686 changed_tz = True 687 dtype = str(getattr(dt_val, 'dtype', 'object')) 688 try: 689 new_dt_series = ( 690 dt_val 691 if check_dtype(dtype, with_utc=True) 692 else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]") 693 ) 694 except pd.errors.OutOfBoundsDatetime: 695 try: 696 next_precision = get_next_precision_unit(true_precision_unit) 697 next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision] 698 new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]") 699 except Exception: 700 new_dt_series = None 701 except ValueError: 702 new_dt_series = None 703 except TypeError: 704 try: 705 new_dt_series = ( 706 new_dt_series 707 if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False) 708 else dt_val.astype(f"datetime64[{precision_abbreviation}]") 709 ) 710 except Exception: 711 new_dt_series = None 712 713 if new_dt_series is None: 714 new_dt_series = dt_val.apply(lambda x: parse(str(x))) 715 716 if coerce_utc: 717 return coerce_timezone(new_dt_series) 718 719 if changed_tz: 720 new_dt_series = new_dt_series.dt.tz_localize(original_tz) 721 return new_dt_series 722 723 try: 724 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 725 if new_dt_val.unit != precision_abbreviation: 726 new_dt_val = new_dt_val.as_unit(precision_abbreviation) 727 if as_pydatetime: 728 return new_dt_val.to_pydatetime() 729 return new_dt_val 730 except (pd.errors.OutOfBoundsDatetime, ValueError): 731 pass 732 733 new_dt_val = parse(dt_val) 734 if not coerce_utc: 735 return new_dt_val 736 return coerce_timezone(new_dt_val)
Wrap pd.to_datetime()
and add support for out-of-bounds values.
Parameters
- dt_val (Any): The value to coerce to Pandas Timestamps.
- as_pydatetime (bool, default False):
If
True
, return a Python datetime object. - coerce_utc (bool, default True):
If
True
, ensure the value has UTC tzinfo. - precision_unit (Optional[str], default None): If provided, enforce the provided precision unit.
739def serialize_bytes(data: bytes) -> str: 740 """ 741 Return the given bytes as a base64-encoded string. 742 """ 743 import base64 744 if not isinstance(data, bytes) and value_is_null(data): 745 return data 746 return base64.b64encode(data).decode('utf-8')
Return the given bytes as a base64-encoded string.
749def serialize_geometry( 750 geom: Any, 751 geometry_format: str = 'wkb_hex', 752 srid: Optional[int] = None, 753) -> Union[str, Dict[str, Any], bytes, None]: 754 """ 755 Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 756 757 Parameters 758 ---------- 759 geom: Any 760 The potential geometry data to be serialized. 761 762 geometry_format: str, default 'wkb_hex' 763 The serialization format for geometry data. 764 Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`. 765 766 srid: Optional[int], default None 767 If provided, use this as the source CRS when serializing to GeoJSON. 768 769 Returns 770 ------- 771 A string containing the geometry data, or bytes, or a dictionary, or None. 772 """ 773 if value_is_null(geom): 774 return None 775 776 shapely, shapely_ops, pyproj, np = mrsm.attempt_import( 777 'shapely', 'shapely.ops', 'pyproj', 'numpy', 778 lazy=False, 779 ) 780 if geometry_format == 'geojson': 781 if srid: 782 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True) 783 geom = shapely_ops.transform(transformer.transform, geom) 784 geojson_str = shapely.to_geojson(geom) 785 return json.loads(geojson_str) 786 787 if not hasattr(geom, 'wkb_hex'): 788 return str(geom) 789 790 byte_order = 1 if np.little_endian else 0 791 792 if geometry_format.startswith("wkb"): 793 return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True) 794 elif geometry_format == 'gpkg_wkb': 795 wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order) 796 flags = ( 797 ((byte_order & 0x01) | (0x20)) 798 if geom.is_empty 799 else (byte_order & 0x01) 800 ) 801 srid_val = srid or -1 802 header = struct.pack( 803 '<ccBBi', 804 b'G', b'P', 805 0, 806 flags, 807 srid_val 808 ) 809 return header + wkb_data 810 811 return shapely.to_wkt(geom)
Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON.
Parameters
- geom (Any): The potential geometry data to be serialized.
- geometry_format (str, default 'wkb_hex'):
The serialization format for geometry data.
Accepted formats are
wkb
,wkb_hex
,wkt
,geojson
, andgpkg_wkb
. - srid (Optional[int], default None): If provided, use this as the source CRS when serializing to GeoJSON.
Returns
- A string containing the geometry data, or bytes, or a dictionary, or None.
814def deserialize_geometry(geom_wkb: Union[str, bytes]): 815 """ 816 Deserialize a WKB string into a shapely geometry object. 817 """ 818 shapely = mrsm.attempt_import('shapely', lazy=False) 819 return shapely.wkb.loads(geom_wkb)
Deserialize a WKB string into a shapely geometry object.
822def project_geometry(geom, srid: int, to_srid: int = 4326): 823 """ 824 Project a shapely geometry object to a new CRS (SRID). 825 """ 826 pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False) 827 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True) 828 return shapely_ops.transform(transformer.transform, geom)
Project a shapely geometry object to a new CRS (SRID).
831def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]: 832 """ 833 Given a serialized ASCII string of bytes data, return the original bytes. 834 The input data may either be base64- or hex-encoded. 835 836 Parameters 837 ---------- 838 data: Optional[str] 839 The string to be deserialized into bytes. 840 May be base64- or hex-encoded (prefixed with `'\\x'`). 841 842 force_hex: bool = False 843 If `True`, treat the input string as hex-encoded. 844 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 845 This will still strip the leading `'\\x'` prefix if present. 846 847 Returns 848 ------- 849 The original bytes used to produce the encoded string `data`. 850 """ 851 if not isinstance(data, str) and value_is_null(data): 852 return data 853 854 import binascii 855 import base64 856 857 is_hex = force_hex or data.startswith('\\x') 858 859 if is_hex: 860 if data.startswith('\\x'): 861 data = data[2:] 862 return binascii.unhexlify(data) 863 864 return base64.b64decode(data)
Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.
Parameters
- data (Optional[str]):
The string to be deserialized into bytes.
May be base64- or hex-encoded (prefixed with
'\x'
). - force_hex (bool = False):
If
True
, treat the input string as hex-encoded. Ifdata
does not begin with the prefix'\x'
, setforce_hex
toTrue
. This will still strip the leading'\x'
prefix if present.
Returns
- The original bytes used to produce the encoded string
data
.
867def deserialize_base64(data: str) -> bytes: 868 """ 869 Return the original bytestring from the given base64-encoded string. 870 """ 871 import base64 872 return base64.b64decode(data)
Return the original bytestring from the given base64-encoded string.
875def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]: 876 """ 877 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 878 """ 879 import binascii 880 if not isinstance(data, bytes) and value_is_null(data): 881 return data 882 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
Return the given bytes as a hex string for PostgreSQL's BYTEA
type.
885def serialize_datetime(dt: datetime) -> Union[str, None]: 886 """ 887 Serialize a datetime object into JSON (ISO format string). 888 889 Examples 890 -------- 891 >>> import json 892 >>> from datetime import datetime 893 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 894 '{"a": "2022-01-01T00:00:00Z"}' 895 896 """ 897 if not hasattr(dt, 'isoformat'): 898 return None 899 900 tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else '' 901 return dt.isoformat() + tz_suffix
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
904def serialize_date(d: date) -> Union[str, None]: 905 """ 906 Serialize a date object into its ISO representation. 907 """ 908 return d.isoformat() if hasattr(d, 'isoformat') else None
Serialize a date object into its ISO representation.
911def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]: 912 """ 913 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 914 915 Parameters 916 ---------- 917 x: Any 918 The value to serialize. 919 920 default_to_str: bool, default True 921 If `True`, return a string of `x` if x is not a designated type. 922 Otherwise return x. 923 924 Returns 925 ------- 926 A serialized version of x, or x. 927 """ 928 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 929 return x.meta 930 931 if hasattr(x, 'tzinfo'): 932 return serialize_datetime(x) 933 934 if hasattr(x, 'isoformat'): 935 return serialize_date(x) 936 937 if isinstance(x, bytes): 938 return serialize_bytes(x) 939 940 if isinstance(x, Decimal): 941 return serialize_decimal(x) 942 943 if 'shapely' in str(type(x)): 944 return serialize_geometry(x) 945 946 if value_is_null(x): 947 return None 948 949 if isinstance(x, (dict, list, tuple)): 950 return json.dumps(x, default=json_serialize_value, separators=(',', ':')) 951 952 return str(x) if default_to_str else x
Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
Parameters
- x (Any): The value to serialize.
- default_to_str (bool, default True):
If
True
, return a string ofx
if x is not a designated type. Otherwise return x.
Returns
- A serialized version of x, or x.
955def get_geometry_type_srid( 956 dtype: str = 'geometry', 957 default_type: str = 'geometry', 958 default_srid: int = 4326, 959) -> Union[Tuple[str, int], Tuple[str, None]]: 960 """ 961 Given the specified geometry `dtype`, return a tuple in the form (type, SRID). 962 963 Parameters 964 ---------- 965 dtype: Optional[str], default None 966 Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`). 967 You may specify a supported `shapely` geometry type and an SRID in the dtype modifier: 968 969 - `Point` 970 - `LineString` 971 - `LinearRing` 972 - `Polygon` 973 - `MultiPoint` 974 - `MultiLineString` 975 - `MultiPolygon` 976 - `GeometryCollection` 977 978 Returns 979 ------- 980 A tuple in the form (type, SRID). 981 Defaults to `(default_type, default_srid)`. 982 983 Examples 984 -------- 985 >>> from meerschaum.utils.dtypes import get_geometry_type_srid 986 >>> get_geometry_type_srid() 987 ('geometry', 4326) 988 >>> get_geometry_type_srid('geometry[]') 989 ('geometry', 4326) 990 >>> get_geometry_type_srid('geometry[Point, 0]') 991 ('Point', 0) 992 >>> get_geometry_type_srid('geometry[0, Point]') 993 ('Point', 0) 994 >>> get_geometry_type_srid('geometry[0]') 995 ('geometry', 0) 996 >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]') 997 ('MultiLineString', 4326) 998 >>> get_geometry_type_srid('geography') 999 ('geometry', 4326) 1000 >>> get_geometry_type_srid('geography[POINT]') 1001 ('Point', 4376) 1002 """ 1003 from meerschaum.utils.misc import is_int 1004 ### NOTE: PostGIS syntax must also be parsed. 1005 dtype = dtype.replace('(', '[').replace(')', ']') 1006 bare_dtype = dtype.split('[', maxsplit=1)[0] 1007 modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']') 1008 if not modifier: 1009 return default_type, default_srid 1010 1011 parts = [ 1012 part.split('=')[-1].strip() 1013 for part in modifier.split(',') 1014 ] 1015 parts_casted = [ 1016 ( 1017 int(part) 1018 if is_int(part) 1019 else part 1020 ) 1021 for part in parts 1022 ] 1023 1024 srid = default_srid 1025 geometry_type = default_type 1026 1027 for part in parts_casted: 1028 if isinstance(part, int): 1029 srid = part 1030 break 1031 1032 for part in parts_casted: 1033 if isinstance(part, str): 1034 geometry_type = part 1035 break 1036 1037 return geometry_type, srid
Given the specified geometry dtype
, return a tuple in the form (type, SRID).
Parameters
dtype (Optional[str], default None): Optionally provide a specific
geometry
syntax (e.g.geometry[MultiLineString, 4326]
). You may specify a supportedshapely
geometry type and an SRID in the dtype modifier:Point
LineString
LinearRing
Polygon
MultiPoint
MultiLineString
MultiPolygon
GeometryCollection
Returns
- A tuple in the form (type, SRID).
- Defaults to
(default_type, default_srid)
.
Examples
>>> from meerschaum.utils.dtypes import get_geometry_type_srid
>>> get_geometry_type_srid()
('geometry', 4326)
>>> get_geometry_type_srid('geometry[]')
('geometry', 4326)
>>> get_geometry_type_srid('geometry[Point, 0]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0, Point]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0]')
('geometry', 0)
>>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
('MultiLineString', 4326)
>>> get_geometry_type_srid('geography')
('geometry', 4326)
>>> get_geometry_type_srid('geography[POINT]')
('Point', 4376)
1040def get_current_timestamp( 1041 precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'], 1042 precision_interval: int = 1, 1043 round_to: str = 'down', 1044 as_pandas: bool = False, 1045 as_int: bool = False, 1046 _now: Union[datetime, int, None] = None, 1047) -> 'Union[datetime, pd.Timestamp, int]': 1048 """ 1049 Return the current UTC timestamp to nanosecond precision. 1050 1051 Parameters 1052 ---------- 1053 precision_unit: str, default 'us' 1054 The precision of the timestamp to be returned. 1055 Valid values are the following: 1056 - `ns` / `nanosecond` 1057 - `us` / `microsecond` 1058 - `ms` / `millisecond` 1059 - `s` / `sec` / `second` 1060 - `m` / `min` / `minute` 1061 - `h` / `hr` / `hour` 1062 - `d` / `day` 1063 1064 precision_interval: int, default 1 1065 Round the timestamp to the `precision_interval` units. 1066 For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals. 1067 Note: `precision_interval` must be 1 when `precision='nanosecond'`. 1068 1069 round_to: str, default 'down' 1070 The direction to which to round the timestamp. 1071 Available options are `down`, `up`, and `closest`. 1072 1073 as_pandas: bool, default False 1074 If `True`, return a Pandas Timestamp. 1075 This is always true if `unit` is `nanosecond`. 1076 1077 as_int: bool, default False 1078 If `True`, return the timestamp to an integer. 1079 Overrides `as_pandas`. 1080 1081 Returns 1082 ------- 1083 A Pandas Timestamp, datetime object, or integer with precision to the provided unit. 1084 1085 Examples 1086 -------- 1087 >>> get_current_timestamp('ns') 1088 Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC') 1089 >>> get_current_timestamp('ms') 1090 Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC') 1091 """ 1092 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1093 if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS: 1094 from meerschaum.utils.misc import items_str 1095 raise ValueError( 1096 f"Unknown precision unit '{precision_unit}'. " 1097 "Accepted values are " 1098 f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}." 1099 ) 1100 1101 if not as_int: 1102 as_pandas = as_pandas or true_precision_unit == 'nanosecond' 1103 pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None 1104 1105 if true_precision_unit == 'nanosecond': 1106 if precision_interval != 1: 1107 warn("`precision_interval` must be 1 for nanosecond precision.") 1108 now_ts = time.time_ns() if not isinstance(_now, int) else _now 1109 if as_int: 1110 return now_ts 1111 return pd.to_datetime(now_ts, unit='ns', utc=True) 1112 1113 now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now 1114 delta = timedelta(**{true_precision_unit + 's': precision_interval}) 1115 rounded_now = round_time(now, delta, to=round_to) 1116 1117 if as_int: 1118 return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit]) 1119 1120 ts_val = ( 1121 pd.to_datetime(rounded_now, utc=True) 1122 if as_pandas 1123 else rounded_now 1124 ) 1125 1126 if not as_pandas: 1127 return ts_val 1128 1129 as_unit_precisions = ('microsecond', 'millisecond', 'second') 1130 if true_precision_unit not in as_unit_precisions: 1131 return ts_val 1132 1133 return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])
Return the current UTC timestamp to nanosecond precision.
Parameters
- precision_unit (str, default 'us'):
The precision of the timestamp to be returned.
Valid values are the following:
-
ns
/nanosecond
-us
/microsecond
-ms
/millisecond
-s
/sec
/second
-m
/min
/minute
-h
/hr
/hour
-d
/day
- precision_interval (int, default 1):
Round the timestamp to the
precision_interval
units. For example,precision='minute'
andprecision_interval=15
will round to 15-minute intervals. Note:precision_interval
must be 1 whenprecision='nanosecond'
. - round_to (str, default 'down'):
The direction to which to round the timestamp.
Available options are
down
,up
, andclosest
. - as_pandas (bool, default False):
If
True
, return a Pandas Timestamp. This is always true ifunit
isnanosecond
. - as_int (bool, default False):
If
True
, return the timestamp to an integer. Overridesas_pandas
.
Returns
- A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
Examples
>>> get_current_timestamp('ns')
Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
>>> get_current_timestamp('ms')
Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1136def is_dtype_special(type_: str) -> bool: 1137 """ 1138 Return whether a dtype should be treated as a special Meerschaum dtype. 1139 This is not the same as a Meerschaum alias. 1140 """ 1141 true_type = MRSM_ALIAS_DTYPES.get(type_, type_) 1142 if true_type in ( 1143 'uuid', 1144 'json', 1145 'bytes', 1146 'numeric', 1147 'datetime', 1148 'geometry', 1149 'geography', 1150 'date', 1151 'bool', 1152 ): 1153 return True 1154 1155 if are_dtypes_equal(true_type, 'datetime'): 1156 return True 1157 1158 if are_dtypes_equal(true_type, 'date'): 1159 return True 1160 1161 if true_type.startswith('numeric'): 1162 return True 1163 1164 if true_type.startswith('bool'): 1165 return True 1166 1167 if true_type.startswith('geometry'): 1168 return True 1169 1170 if true_type.startswith('geography'): 1171 return True 1172 1173 return False
Return whether a dtype should be treated as a special Meerschaum dtype. This is not the same as a Meerschaum alias.
1176def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str: 1177 """ 1178 Get the next precision string in order of value. 1179 1180 Parameters 1181 ---------- 1182 precision_unit: str 1183 The precision string (`'nanosecond'`, `'ms'`, etc.). 1184 1185 decrease: bool, defaul True 1186 If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`). 1187 If `False`, return the precision unit which is higher. 1188 1189 Returns 1190 ------- 1191 A `precision` string which is lower or higher than the given precision unit. 1192 1193 Examples 1194 -------- 1195 >>> get_next_precision_unit('nanosecond') 1196 'microsecond' 1197 >>> get_next_precision_unit('ms') 1198 'second' 1199 >>> get_next_precision_unit('hour', decrease=False) 1200 'minute' 1201 """ 1202 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1203 precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None) 1204 if not precision_scalar: 1205 raise ValueError(f"Invalid precision unit '{precision_unit}'.") 1206 1207 precisions = sorted( 1208 list(MRSM_PRECISION_UNITS_SCALARS), 1209 key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p] 1210 ) 1211 1212 precision_index = precisions.index(true_precision_unit) 1213 new_precision_index = precision_index + (-1 if decrease else 1) 1214 if new_precision_index < 0 or new_precision_index >= len(precisions): 1215 raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.") 1216 1217 return precisions[new_precision_index]
Get the next precision string in order of value.
Parameters
- precision_unit (str):
The precision string (
'nanosecond'
,'ms'
, etc.). - decrease (bool, defaul True):
If
True
return the precision unit which is lower (e.g.nanosecond
->millisecond
). IfFalse
, return the precision unit which is higher.
Returns
- A
precision
string which is lower or higher than the given precision unit.
Examples
>>> get_next_precision_unit('nanosecond')
'microsecond'
>>> get_next_precision_unit('ms')
'second'
>>> get_next_precision_unit('hour', decrease=False)
'minute'
1220def round_time( 1221 dt: Optional[datetime] = None, 1222 date_delta: Optional[timedelta] = None, 1223 to: 'str' = 'down' 1224) -> datetime: 1225 """ 1226 Round a datetime object to a multiple of a timedelta. 1227 1228 Parameters 1229 ---------- 1230 dt: Optional[datetime], default None 1231 If `None`, grab the current UTC datetime. 1232 1233 date_delta: Optional[timedelta], default None 1234 If `None`, use a delta of 1 minute. 1235 1236 to: 'str', default 'down' 1237 Available options are `'up'`, `'down'`, and `'closest'`. 1238 1239 Returns 1240 ------- 1241 A rounded `datetime` object. 1242 1243 Examples 1244 -------- 1245 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 1246 datetime.datetime(2022, 1, 1, 12, 15) 1247 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 1248 datetime.datetime(2022, 1, 1, 12, 16) 1249 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 1250 datetime.datetime(2022, 1, 1, 12, 0) 1251 >>> round_time( 1252 ... datetime(2022, 1, 1, 12, 15, 57, 200), 1253 ... timedelta(hours=1), 1254 ... to = 'closest' 1255 ... ) 1256 datetime.datetime(2022, 1, 1, 12, 0) 1257 >>> round_time( 1258 ... datetime(2022, 1, 1, 12, 45, 57, 200), 1259 ... datetime.timedelta(hours=1), 1260 ... to = 'closest' 1261 ... ) 1262 datetime.datetime(2022, 1, 1, 13, 0) 1263 1264 """ 1265 from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP 1266 if date_delta is None: 1267 date_delta = timedelta(minutes=1) 1268 1269 if dt is None: 1270 dt = datetime.now(timezone.utc).replace(tzinfo=None) 1271 1272 def get_total_microseconds(td: timedelta) -> int: 1273 return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds 1274 1275 round_to_microseconds = get_total_microseconds(date_delta) 1276 if round_to_microseconds == 0: 1277 return dt 1278 1279 dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min 1280 dt_total_microseconds = get_total_microseconds(dt_delta_from_min) 1281 1282 dt_dec = Decimal(dt_total_microseconds) 1283 round_to_dec = Decimal(round_to_microseconds) 1284 1285 div = dt_dec / round_to_dec 1286 if to == 'down': 1287 num_intervals = div.to_integral_value(rounding=ROUND_DOWN) 1288 elif to == 'up': 1289 num_intervals = div.to_integral_value(rounding=ROUND_UP) 1290 else: 1291 num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP) 1292 1293 rounded_dt_total_microseconds = num_intervals * round_to_dec 1294 adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds 1295 1296 return dt + timedelta(microseconds=adjustment_microseconds)
Round a datetime object to a multiple of a timedelta.
Parameters
- dt (Optional[datetime], default None):
If
None
, grab the current UTC datetime. - date_delta (Optional[timedelta], default None):
If
None
, use a delta of 1 minute. - to ('str', default 'down'):
Available options are
'up'
,'down'
, and'closest'
.
Returns
- A rounded
datetime
object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 15, 57, 200),
... timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 45, 57, 200),
... datetime.timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)