meerschaum.utils.dtypes
Utility functions for working with data types.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with data types. 7""" 8 9import traceback 10import json 11import uuid 12import time 13from datetime import timezone, datetime, date, timedelta 14from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP 15 16import meerschaum as mrsm 17from meerschaum.utils.typing import Dict, Union, Any, Optional, Tuple 18from meerschaum.utils.warnings import warn 19from meerschaum._internal.static import STATIC_CONFIG as _STATIC_CONFIG 20 21MRSM_ALIAS_DTYPES: Dict[str, str] = { 22 'decimal': 'numeric', 23 'Decimal': 'numeric', 24 'number': 'numeric', 25 'jsonl': 'json', 26 'JSON': 'json', 27 'binary': 'bytes', 28 'blob': 'bytes', 29 'varbinary': 'bytes', 30 'bytea': 'bytes', 31 'guid': 'uuid', 32 'UUID': 'uuid', 33 'geom': 'geometry', 34 'geog': 'geography', 35 'boolean': 'bool', 36 'day': 'date', 37} 38MRSM_PD_DTYPES: Dict[Union[str, None], str] = { 39 'json': 'object', 40 'numeric': 'object', 41 'geometry': 'object', 42 'geography': 'object', 43 'uuid': 'object', 44 'date': 'date32[day][pyarrow]', 45 'datetime': 'datetime64[us, UTC]', 46 'bool': 'bool[pyarrow]', 47 'int': 'int64[pyarrow]', 48 'int8': 'int8[pyarrow]', 49 'int16': 'int16[pyarrow]', 50 'int32': 'int32[pyarrow]', 51 'int64': 'int64[pyarrow]', 52 'str': 'string', 53 'bytes': 'binary[pyarrow]', 54 None: 'object', 55} 56 57MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = { 58 'nanosecond': 1_000_000_000, 59 'microsecond': 1_000_000, 60 'millisecond': 1000, 61 'second': 1, 62 'minute': (1 / 60), 63 'hour': (1 / 3600), 64 'day': (1 / 86400), 65} 66 67MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = { 68 'ns': 'nanosecond', 69 'us': 'microsecond', 70 'ms': 'millisecond', 71 's': 'second', 72 'sec': 'second', 73 'm': 'minute', 74 'min': 'minute', 75 'h': 'hour', 76 'hr': 'hour', 77 'd': 'day', 78 'D': 'day', 79} 80MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = { 81 'nanosecond': 'ns', 82 'microsecond': 'us', 83 'millisecond': 'ms', 84 'second': 's', 85 'minute': 'min', 86 'hour': 'hr', 87 'day': 'D', 88} 89 90 91def to_pandas_dtype(dtype: str) -> str: 92 """ 93 Cast a supported Meerschaum dtype to a Pandas dtype. 94 """ 95 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 96 if known_dtype is not None: 97 return known_dtype 98 99 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 100 if alias_dtype is not None: 101 return MRSM_PD_DTYPES[alias_dtype] 102 103 if dtype.startswith('numeric'): 104 return MRSM_PD_DTYPES['numeric'] 105 106 if dtype.startswith('geometry'): 107 return MRSM_PD_DTYPES['geometry'] 108 109 if dtype.startswith('geography'): 110 return MRSM_PD_DTYPES['geography'] 111 112 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 113 ### treat it as a SQL db type. 114 if dtype.split(' ')[0].isupper(): 115 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 116 return get_pd_type_from_db_type(dtype) 117 118 from meerschaum.utils.packages import attempt_import 119 _ = attempt_import('pyarrow', lazy=False) 120 pandas = attempt_import('pandas', lazy=False) 121 122 try: 123 return str(pandas.api.types.pandas_dtype(dtype)) 124 except Exception: 125 warn( 126 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 127 + f"{traceback.format_exc()}", 128 stack=False, 129 ) 130 return 'object' 131 132 133def are_dtypes_equal( 134 ldtype: Union[str, Dict[str, str]], 135 rdtype: Union[str, Dict[str, str]], 136) -> bool: 137 """ 138 Determine whether two dtype strings may be considered 139 equivalent to avoid unnecessary conversions. 140 141 Parameters 142 ---------- 143 ldtype: Union[str, Dict[str, str]] 144 The left dtype to compare. 145 May also provide a dtypes dictionary. 146 147 rdtype: Union[str, Dict[str, str]] 148 The right dtype to compare. 149 May also provide a dtypes dictionary. 150 151 Returns 152 ------- 153 A `bool` indicating whether the two dtypes are to be considered equivalent. 154 """ 155 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 156 lkeys = sorted([str(k) for k in ldtype.keys()]) 157 rkeys = sorted([str(k) for k in rdtype.keys()]) 158 for lkey, rkey in zip(lkeys, rkeys): 159 if lkey != rkey: 160 return False 161 ltype = ldtype[lkey] 162 rtype = rdtype[rkey] 163 if not are_dtypes_equal(ltype, rtype): 164 return False 165 return True 166 167 try: 168 if ldtype == rdtype: 169 return True 170 except Exception: 171 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 172 return False 173 174 ### Sometimes pandas dtype objects are passed. 175 ldtype = str(ldtype).split('[', maxsplit=1)[0] 176 rdtype = str(rdtype).split('[', maxsplit=1)[0] 177 178 if ldtype in MRSM_ALIAS_DTYPES: 179 ldtype = MRSM_ALIAS_DTYPES[ldtype] 180 181 if rdtype in MRSM_ALIAS_DTYPES: 182 rdtype = MRSM_ALIAS_DTYPES[rdtype] 183 184 json_dtypes = ('json', 'object') 185 if ldtype in json_dtypes and rdtype in json_dtypes: 186 return True 187 188 numeric_dtypes = ('numeric', 'decimal', 'object') 189 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 190 return True 191 192 uuid_dtypes = ('uuid', 'object') 193 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 194 return True 195 196 bytes_dtypes = ('bytes', 'object', 'binary') 197 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 198 return True 199 200 geometry_dtypes = ('geometry', 'object', 'geography') 201 if ldtype in geometry_dtypes and rdtype in geometry_dtypes: 202 return True 203 204 if ldtype.lower() == rdtype.lower(): 205 return True 206 207 datetime_dtypes = ('datetime', 'timestamp') 208 ldtype_found_dt_prefix = False 209 rdtype_found_dt_prefix = False 210 for dt_prefix in datetime_dtypes: 211 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 212 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 213 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 214 return True 215 216 string_dtypes = ('str', 'string', 'object') 217 if ldtype in string_dtypes and rdtype in string_dtypes: 218 return True 219 220 int_dtypes = ( 221 'int', 'int64', 'int32', 'int16', 'int8', 222 'uint', 'uint64', 'uint32', 'uint16', 'uint8', 223 ) 224 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 225 return True 226 227 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 228 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 229 return True 230 231 bool_dtypes = ('bool', 'boolean') 232 if ldtype in bool_dtypes and rdtype in bool_dtypes: 233 return True 234 235 date_dtypes = ( 236 'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]', 237 'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]', 238 ) 239 if ldtype in date_dtypes and rdtype in date_dtypes: 240 return True 241 242 return False 243 244 245def is_dtype_numeric(dtype: str) -> bool: 246 """ 247 Determine whether a given `dtype` string 248 should be considered compatible with the Meerschaum dtype `numeric`. 249 250 Parameters 251 ---------- 252 dtype: str 253 The pandas-like dtype string. 254 255 Returns 256 ------- 257 A bool indicating the dtype is compatible with `numeric`. 258 """ 259 dtype_lower = dtype.lower() 260 261 acceptable_substrings = ('numeric', 'float', 'double', 'int') 262 for substring in acceptable_substrings: 263 if substring in dtype_lower: 264 return True 265 266 return False 267 268 269def attempt_cast_to_numeric( 270 value: Any, 271 quantize: bool = False, 272 precision: Optional[int] = None, 273 scale: Optional[int] = None, 274)-> Any: 275 """ 276 Given a value, attempt to coerce it into a numeric (Decimal). 277 278 Parameters 279 ---------- 280 value: Any 281 The value to be cast to a Decimal. 282 283 quantize: bool, default False 284 If `True`, quantize the decimal to the specified precision and scale. 285 286 precision: Optional[int], default None 287 If `quantize` is `True`, use this precision. 288 289 scale: Optional[int], default None 290 If `quantize` is `True`, use this scale. 291 292 Returns 293 ------- 294 A `Decimal` if possible, or `value`. 295 """ 296 if isinstance(value, Decimal): 297 if quantize and precision and scale: 298 return quantize_decimal(value, precision, scale) 299 return value 300 try: 301 if value_is_null(value): 302 return Decimal('NaN') 303 304 dec = Decimal(str(value)) 305 if not quantize or not precision or not scale: 306 return dec 307 return quantize_decimal(dec, precision, scale) 308 except Exception: 309 return value 310 311 312def attempt_cast_to_uuid(value: Any) -> Any: 313 """ 314 Given a value, attempt to coerce it into a UUID (`uuid4`). 315 """ 316 if isinstance(value, uuid.UUID): 317 return value 318 try: 319 return ( 320 uuid.UUID(str(value)) 321 if not value_is_null(value) 322 else None 323 ) 324 except Exception: 325 return value 326 327 328def attempt_cast_to_bytes(value: Any) -> Any: 329 """ 330 Given a value, attempt to coerce it into a bytestring. 331 """ 332 if isinstance(value, bytes): 333 return value 334 try: 335 return ( 336 deserialize_bytes_string(str(value)) 337 if not value_is_null(value) 338 else None 339 ) 340 except Exception: 341 return value 342 343 344def attempt_cast_to_geometry(value: Any) -> Any: 345 """ 346 Given a value, attempt to coerce it into a `shapely` (`geometry`) object. 347 """ 348 shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import( 349 'shapely', 350 'shapely.wkt', 351 'shapely.wkb', 352 lazy=False, 353 ) 354 if 'shapely' in str(type(value)): 355 return value 356 357 if isinstance(value, (dict, list)): 358 try: 359 return shapely.from_geojson(json.dumps(value)) 360 except Exception: 361 return value 362 363 value_is_wkt = geometry_is_wkt(value) 364 if value_is_wkt is None: 365 return value 366 367 try: 368 return ( 369 shapely_wkt.loads(value) 370 if value_is_wkt 371 else shapely_wkb.loads(value) 372 ) 373 except Exception: 374 return value 375 376 377def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]: 378 """ 379 Determine whether an input value should be treated as WKT or WKB geometry data. 380 381 Parameters 382 ---------- 383 value: Union[str, bytes] 384 The input data to be parsed into geometry data. 385 386 Returns 387 ------- 388 A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB). 389 Return `None` if `value` should be parsed as neither. 390 """ 391 import re 392 if not isinstance(value, (str, bytes)): 393 return None 394 395 if isinstance(value, bytes): 396 return False 397 398 wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$' 399 if re.match(wkt_pattern, value, re.IGNORECASE): 400 return True 401 402 if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0: 403 return False 404 405 return None 406 407 408def value_is_null(value: Any) -> bool: 409 """ 410 Determine if a value is a null-like string. 411 """ 412 return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>') 413 414 415def none_if_null(value: Any) -> Any: 416 """ 417 Return `None` if a value is a null-like string. 418 """ 419 return (None if value_is_null(value) else value) 420 421 422def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 423 """ 424 Quantize a given `Decimal` to a known scale and precision. 425 426 Parameters 427 ---------- 428 x: Decimal 429 The `Decimal` to be quantized. 430 431 precision: int 432 The total number of significant digits. 433 434 scale: int 435 The number of significant digits after the decimal point. 436 437 Returns 438 ------- 439 A `Decimal` quantized to the specified scale and precision. 440 """ 441 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 442 try: 443 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 444 except InvalidOperation: 445 pass 446 447 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.") 448 449 450def serialize_decimal( 451 x: Any, 452 quantize: bool = False, 453 precision: Optional[int] = None, 454 scale: Optional[int] = None, 455) -> Any: 456 """ 457 Return a quantized string of an input decimal. 458 459 Parameters 460 ---------- 461 x: Any 462 The potential decimal to be serialized. 463 464 quantize: bool, default False 465 If `True`, quantize the incoming Decimal to the specified scale and precision 466 before serialization. 467 468 precision: Optional[int], default None 469 The precision of the decimal to be quantized. 470 471 scale: Optional[int], default None 472 The scale of the decimal to be quantized. 473 474 Returns 475 ------- 476 A string of the input decimal or the input if not a Decimal. 477 """ 478 if not isinstance(x, Decimal): 479 return x 480 481 if value_is_null(x): 482 return None 483 484 if quantize and scale and precision: 485 x = quantize_decimal(x, precision, scale) 486 487 return f"{x:f}" 488 489 490def coerce_timezone( 491 dt: Any, 492 strip_utc: bool = False, 493) -> Any: 494 """ 495 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 496 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 497 """ 498 if dt is None: 499 return None 500 501 if isinstance(dt, int): 502 return dt 503 504 if isinstance(dt, str): 505 dateutil_parser = mrsm.attempt_import('dateutil.parser') 506 try: 507 dt = dateutil_parser.parse(dt) 508 except Exception: 509 return dt 510 511 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 512 if dt_is_series: 513 pandas = mrsm.attempt_import('pandas', lazy=False) 514 515 if ( 516 pandas.api.types.is_datetime64_any_dtype(dt) and ( 517 (dt.dt.tz is not None and not strip_utc) 518 or 519 (dt.dt.tz is None and strip_utc) 520 ) 521 ): 522 return dt 523 524 dt_series = to_datetime(dt, coerce_utc=False) 525 if dt_series.dt.tz is None: 526 dt_series = dt_series.dt.tz_localize(timezone.utc) 527 if strip_utc: 528 try: 529 if dt_series.dt.tz is not None: 530 dt_series = dt_series.dt.tz_localize(None) 531 except Exception: 532 pass 533 534 return dt_series 535 536 if dt.tzinfo is None: 537 if strip_utc: 538 return dt 539 return dt.replace(tzinfo=timezone.utc) 540 541 utc_dt = dt.astimezone(timezone.utc) 542 if strip_utc: 543 return utc_dt.replace(tzinfo=None) 544 return utc_dt 545 546 547def to_datetime( 548 dt_val: Any, 549 as_pydatetime: bool = False, 550 coerce_utc: bool = True, 551 precision_unit: Optional[str] = None, 552) -> Any: 553 """ 554 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 555 556 Parameters 557 ---------- 558 dt_val: Any 559 The value to coerce to Pandas Timestamps. 560 561 as_pydatetime: bool, default False 562 If `True`, return a Python datetime object. 563 564 coerce_utc: bool, default True 565 If `True`, ensure the value has UTC tzinfo. 566 567 precision_unit: Optional[str], default None 568 If provided, enforce the provided precision unit. 569 """ 570 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 571 is_dask = 'dask' in getattr(dt_val, '__module__', '') 572 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 573 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 574 pd = pandas if dd is None else dd 575 enforce_precision = precision_unit is not None 576 precision_unit = precision_unit or 'microsecond' 577 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 578 precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None) 579 if not precision_abbreviation: 580 raise ValueError(f"Invalid precision '{precision_unit}'.") 581 582 def parse(x: Any) -> Any: 583 try: 584 return dateutil_parser.parse(x) 585 except Exception: 586 return x 587 588 def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool: 589 dtype_check_against = ( 590 f"datetime64[{precision_abbreviation}, UTC]" 591 if with_utc 592 else f"datetime64[{precision_abbreviation}]" 593 ) 594 return ( 595 dtype_to_check == dtype_check_against 596 if enforce_precision 597 else ( 598 dtype_to_check.startswith('datetime64[') 599 and ( 600 ('utc' in dtype_to_check.lower()) 601 if with_utc 602 else ('utc' not in dtype_to_check.lower()) 603 ) 604 ) 605 ) 606 607 if isinstance(dt_val, pd.Timestamp): 608 dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime() 609 return ( 610 coerce_timezone(dt_val_to_return) 611 if coerce_utc 612 else dt_val_to_return 613 ) 614 615 if dt_is_series: 616 changed_tz = False 617 original_tz = None 618 dtype = str(getattr(dt_val, 'dtype', 'object')) 619 if ( 620 are_dtypes_equal(dtype, 'datetime') 621 and 'utc' not in dtype.lower() 622 and hasattr(dt_val, 'dt') 623 ): 624 original_tz = dt_val.dt.tz 625 dt_val = dt_val.dt.tz_localize(timezone.utc) 626 changed_tz = True 627 dtype = str(getattr(dt_val, 'dtype', 'object')) 628 try: 629 new_dt_series = ( 630 dt_val 631 if check_dtype(dtype, with_utc=True) 632 else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]") 633 ) 634 except pd.errors.OutOfBoundsDatetime: 635 try: 636 next_precision = get_next_precision_unit(true_precision_unit) 637 next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision] 638 new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]") 639 except Exception: 640 new_dt_series = None 641 except ValueError: 642 new_dt_series = None 643 except TypeError: 644 try: 645 new_dt_series = ( 646 new_dt_series 647 if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False) 648 else dt_val.astype(f"datetime64[{precision_abbreviation}]") 649 ) 650 except Exception: 651 new_dt_series = None 652 653 if new_dt_series is None: 654 new_dt_series = dt_val.apply(lambda x: parse(str(x))) 655 656 if coerce_utc: 657 return coerce_timezone(new_dt_series) 658 659 if changed_tz: 660 new_dt_series = new_dt_series.dt.tz_localize(original_tz) 661 return new_dt_series 662 663 try: 664 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 665 if new_dt_val.unit != precision_abbreviation: 666 new_dt_val = new_dt_val.as_unit(precision_abbreviation) 667 if as_pydatetime: 668 return new_dt_val.to_pydatetime() 669 return new_dt_val 670 except (pd.errors.OutOfBoundsDatetime, ValueError): 671 pass 672 673 new_dt_val = parse(dt_val) 674 if not coerce_utc: 675 return new_dt_val 676 return coerce_timezone(new_dt_val) 677 678 679def serialize_bytes(data: bytes) -> str: 680 """ 681 Return the given bytes as a base64-encoded string. 682 """ 683 import base64 684 if not isinstance(data, bytes) and value_is_null(data): 685 return data 686 return base64.b64encode(data).decode('utf-8') 687 688 689def serialize_geometry( 690 geom: Any, 691 geometry_format: str = 'wkb_hex', 692 srid: Optional[int] = None, 693) -> Union[str, Dict[str, Any], None]: 694 """ 695 Serialize geometry data as a hex-encoded well-known-binary string. 696 697 Parameters 698 ---------- 699 geom: Any 700 The potential geometry data to be serialized. 701 702 geometry_format: str, default 'wkb_hex' 703 The serialization format for geometry data. 704 Accepted formats are `wkb_hex` (well-known binary hex string), 705 `wkt` (well-known text), and `geojson`. 706 707 srid: Optional[int], default None 708 If provided, use this as the source CRS when serializing to GeoJSON. 709 710 Returns 711 ------- 712 A string containing the geometry data. 713 """ 714 if value_is_null(geom): 715 return None 716 shapely, shapely_ops, pyproj = mrsm.attempt_import( 717 'shapely', 'shapely.ops', 'pyproj', 718 lazy=False, 719 ) 720 if geometry_format == 'geojson': 721 if srid: 722 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True) 723 geom = shapely_ops.transform(transformer.transform, geom) 724 geojson_str = shapely.to_geojson(geom) 725 return json.loads(geojson_str) 726 727 if hasattr(geom, 'wkb_hex'): 728 if geometry_format == "wkb_hex": 729 return shapely.to_wkb(geom, hex=True, include_srid=True) 730 return shapely.to_wkt(geom) 731 732 return str(geom) 733 734 735def deserialize_geometry(geom_wkb: Union[str, bytes]): 736 """ 737 Deserialize a WKB string into a shapely geometry object. 738 """ 739 shapely = mrsm.attempt_import('shapely', lazy=False) 740 return shapely.wkb.loads(geom_wkb) 741 742 743def project_geometry(geom, srid: int, to_srid: int = 4326): 744 """ 745 Project a shapely geometry object to a new CRS (SRID). 746 """ 747 pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False) 748 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True) 749 return shapely_ops.transform(transformer.transform, geom) 750 751 752def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]: 753 """ 754 Given a serialized ASCII string of bytes data, return the original bytes. 755 The input data may either be base64- or hex-encoded. 756 757 Parameters 758 ---------- 759 data: Optional[str] 760 The string to be deserialized into bytes. 761 May be base64- or hex-encoded (prefixed with `'\\x'`). 762 763 force_hex: bool = False 764 If `True`, treat the input string as hex-encoded. 765 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 766 This will still strip the leading `'\\x'` prefix if present. 767 768 Returns 769 ------- 770 The original bytes used to produce the encoded string `data`. 771 """ 772 if not isinstance(data, str) and value_is_null(data): 773 return data 774 775 import binascii 776 import base64 777 778 is_hex = force_hex or data.startswith('\\x') 779 780 if is_hex: 781 if data.startswith('\\x'): 782 data = data[2:] 783 return binascii.unhexlify(data) 784 785 return base64.b64decode(data) 786 787 788def deserialize_base64(data: str) -> bytes: 789 """ 790 Return the original bytestring from the given base64-encoded string. 791 """ 792 import base64 793 return base64.b64decode(data) 794 795 796def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]: 797 """ 798 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 799 """ 800 import binascii 801 if not isinstance(data, bytes) and value_is_null(data): 802 return data 803 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8') 804 805 806def serialize_datetime(dt: datetime) -> Union[str, None]: 807 """ 808 Serialize a datetime object into JSON (ISO format string). 809 810 Examples 811 -------- 812 >>> import json 813 >>> from datetime import datetime 814 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 815 '{"a": "2022-01-01T00:00:00Z"}' 816 817 """ 818 if not hasattr(dt, 'isoformat'): 819 return None 820 821 tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else '' 822 return dt.isoformat() + tz_suffix 823 824 825def serialize_date(d: date) -> Union[str, None]: 826 """ 827 Serialize a date object into its ISO representation. 828 """ 829 return d.isoformat() if hasattr(d, 'isoformat') else None 830 831 832def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]: 833 """ 834 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 835 836 Parameters 837 ---------- 838 x: Any 839 The value to serialize. 840 841 default_to_str: bool, default True 842 If `True`, return a string of `x` if x is not a designated type. 843 Otherwise return x. 844 845 Returns 846 ------- 847 A serialized version of x, or x. 848 """ 849 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 850 return x.meta 851 852 if hasattr(x, 'tzinfo'): 853 return serialize_datetime(x) 854 855 if hasattr(x, 'isoformat'): 856 return serialize_date(x) 857 858 if isinstance(x, bytes): 859 return serialize_bytes(x) 860 861 if isinstance(x, Decimal): 862 return serialize_decimal(x) 863 864 if 'shapely' in str(type(x)): 865 return serialize_geometry(x) 866 867 if value_is_null(x): 868 return None 869 870 if isinstance(x, (dict, list, tuple)): 871 return json.dumps(x, default=json_serialize_value, separators=(',', ':')) 872 873 return str(x) if default_to_str else x 874 875 876def get_geometry_type_srid( 877 dtype: str = 'geometry', 878 default_type: str = 'geometry', 879 default_srid: int = 4326, 880) -> Union[Tuple[str, int], Tuple[str, None]]: 881 """ 882 Given the specified geometry `dtype`, return a tuple in the form (type, SRID). 883 884 Parameters 885 ---------- 886 dtype: Optional[str], default None 887 Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`). 888 You may specify a supported `shapely` geometry type and an SRID in the dtype modifier: 889 890 - `Point` 891 - `LineString` 892 - `LinearRing` 893 - `Polygon` 894 - `MultiPoint` 895 - `MultiLineString` 896 - `MultiPolygon` 897 - `GeometryCollection` 898 899 Returns 900 ------- 901 A tuple in the form (type, SRID). 902 Defaults to `(default_type, default_srid)`. 903 904 Examples 905 -------- 906 >>> from meerschaum.utils.dtypes import get_geometry_type_srid 907 >>> get_geometry_type_srid() 908 ('geometry', 4326) 909 >>> get_geometry_type_srid('geometry[]') 910 ('geometry', 4326) 911 >>> get_geometry_type_srid('geometry[Point, 0]') 912 ('Point', 0) 913 >>> get_geometry_type_srid('geometry[0, Point]') 914 ('Point', 0) 915 >>> get_geometry_type_srid('geometry[0]') 916 ('geometry', 0) 917 >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]') 918 ('MultiLineString', 4326) 919 >>> get_geometry_type_srid('geography') 920 ('geometry', 4326) 921 >>> get_geometry_type_srid('geography[POINT]') 922 ('Point', 4376) 923 """ 924 from meerschaum.utils.misc import is_int 925 ### NOTE: PostGIS syntax must also be parsed. 926 dtype = dtype.replace('(', '[').replace(')', ']') 927 bare_dtype = dtype.split('[', maxsplit=1)[0] 928 modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']') 929 if not modifier: 930 return default_type, default_srid 931 932 parts = [ 933 part.split('=')[-1].strip() 934 for part in modifier.split(',') 935 ] 936 parts_casted = [ 937 ( 938 int(part) 939 if is_int(part) 940 else part 941 ) 942 for part in parts 943 ] 944 945 srid = default_srid 946 geometry_type = default_type 947 948 for part in parts_casted: 949 if isinstance(part, int): 950 srid = part 951 break 952 953 for part in parts_casted: 954 if isinstance(part, str): 955 geometry_type = part 956 break 957 958 return geometry_type, srid 959 960 961def get_current_timestamp( 962 precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'], 963 precision_interval: int = 1, 964 round_to: str = 'down', 965 as_pandas: bool = False, 966 as_int: bool = False, 967 _now: Union[datetime, int, None] = None, 968) -> 'Union[datetime, pd.Timestamp, int]': 969 """ 970 Return the current UTC timestamp to nanosecond precision. 971 972 Parameters 973 ---------- 974 precision_unit: str, default 'us' 975 The precision of the timestamp to be returned. 976 Valid values are the following: 977 - `ns` / `nanosecond` 978 - `us` / `microsecond` 979 - `ms` / `millisecond` 980 - `s` / `sec` / `second` 981 - `m` / `min` / `minute` 982 - `h` / `hr` / `hour` 983 - `d` / `day` 984 985 precision_interval: int, default 1 986 Round the timestamp to the `precision_interval` units. 987 For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals. 988 Note: `precision_interval` must be 1 when `precision='nanosecond'`. 989 990 round_to: str, default 'down' 991 The direction to which to round the timestamp. 992 Available options are `down`, `up`, and `closest`. 993 994 as_pandas: bool, default False 995 If `True`, return a Pandas Timestamp. 996 This is always true if `unit` is `nanosecond`. 997 998 as_int: bool, default False 999 If `True`, return the timestamp to an integer. 1000 Overrides `as_pandas`. 1001 1002 Returns 1003 ------- 1004 A Pandas Timestamp, datetime object, or integer with precision to the provided unit. 1005 1006 Examples 1007 -------- 1008 >>> get_current_timestamp('ns') 1009 Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC') 1010 >>> get_current_timestamp('ms') 1011 Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC') 1012 """ 1013 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1014 if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS: 1015 from meerschaum.utils.misc import items_str 1016 raise ValueError( 1017 f"Unknown precision unit '{precision_unit}'. " 1018 "Accepted values are " 1019 f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}." 1020 ) 1021 1022 if not as_int: 1023 as_pandas = as_pandas or true_precision_unit == 'nanosecond' 1024 pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None 1025 1026 if true_precision_unit == 'nanosecond': 1027 if precision_interval != 1: 1028 warn("`precision_interval` must be 1 for nanosecond precision.") 1029 now_ts = time.time_ns() if not isinstance(_now, int) else _now 1030 if as_int: 1031 return now_ts 1032 return pd.to_datetime(now_ts, unit='ns', utc=True) 1033 1034 now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now 1035 delta = timedelta(**{true_precision_unit + 's': precision_interval}) 1036 rounded_now = round_time(now, delta, to=round_to) 1037 1038 if as_int: 1039 return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit]) 1040 1041 ts_val = ( 1042 pd.to_datetime(rounded_now, utc=True) 1043 if as_pandas 1044 else rounded_now 1045 ) 1046 1047 if not as_pandas: 1048 return ts_val 1049 1050 as_unit_precisions = ('microsecond', 'millisecond', 'second') 1051 if true_precision_unit not in as_unit_precisions: 1052 return ts_val 1053 1054 return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit]) 1055 1056 1057def is_dtype_special(type_: str) -> bool: 1058 """ 1059 Return whether a dtype should be treated as a special Meerschaum dtype. 1060 This is not the same as a Meerschaum alias. 1061 """ 1062 true_type = MRSM_ALIAS_DTYPES.get(type_, type_) 1063 if true_type in ( 1064 'uuid', 1065 'json', 1066 'bytes', 1067 'numeric', 1068 'datetime', 1069 'geometry', 1070 'geography', 1071 'date', 1072 'bool', 1073 ): 1074 return True 1075 1076 if are_dtypes_equal(true_type, 'datetime'): 1077 return True 1078 1079 if are_dtypes_equal(true_type, 'date'): 1080 return True 1081 1082 if true_type.startswith('numeric'): 1083 return True 1084 1085 if true_type.startswith('bool'): 1086 return True 1087 1088 if true_type.startswith('geometry'): 1089 return True 1090 1091 if true_type.startswith('geography'): 1092 return True 1093 1094 return False 1095 1096 1097def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str: 1098 """ 1099 Get the next precision string in order of value. 1100 1101 Parameters 1102 ---------- 1103 precision_unit: str 1104 The precision string (`'nanosecond'`, `'ms'`, etc.). 1105 1106 decrease: bool, defaul True 1107 If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`). 1108 If `False`, return the precision unit which is higher. 1109 1110 Returns 1111 ------- 1112 A `precision` string which is lower or higher than the given precision unit. 1113 1114 Examples 1115 -------- 1116 >>> get_next_precision_unit('nanosecond') 1117 'microsecond' 1118 >>> get_next_precision_unit('ms') 1119 'second' 1120 >>> get_next_precision_unit('hour', decrease=False) 1121 'minute' 1122 """ 1123 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1124 precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None) 1125 if not precision_scalar: 1126 raise ValueError(f"Invalid precision unit '{precision_unit}'.") 1127 1128 precisions = sorted( 1129 list(MRSM_PRECISION_UNITS_SCALARS), 1130 key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p] 1131 ) 1132 1133 precision_index = precisions.index(true_precision_unit) 1134 new_precision_index = precision_index + (-1 if decrease else 1) 1135 if new_precision_index < 0 or new_precision_index >= len(precisions): 1136 raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.") 1137 1138 return precisions[new_precision_index] 1139 1140 1141def round_time( 1142 dt: Optional[datetime] = None, 1143 date_delta: Optional[timedelta] = None, 1144 to: 'str' = 'down' 1145) -> datetime: 1146 """ 1147 Round a datetime object to a multiple of a timedelta. 1148 1149 Parameters 1150 ---------- 1151 dt: Optional[datetime], default None 1152 If `None`, grab the current UTC datetime. 1153 1154 date_delta: Optional[timedelta], default None 1155 If `None`, use a delta of 1 minute. 1156 1157 to: 'str', default 'down' 1158 Available options are `'up'`, `'down'`, and `'closest'`. 1159 1160 Returns 1161 ------- 1162 A rounded `datetime` object. 1163 1164 Examples 1165 -------- 1166 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 1167 datetime.datetime(2022, 1, 1, 12, 15) 1168 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 1169 datetime.datetime(2022, 1, 1, 12, 16) 1170 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 1171 datetime.datetime(2022, 1, 1, 12, 0) 1172 >>> round_time( 1173 ... datetime(2022, 1, 1, 12, 15, 57, 200), 1174 ... timedelta(hours=1), 1175 ... to = 'closest' 1176 ... ) 1177 datetime.datetime(2022, 1, 1, 12, 0) 1178 >>> round_time( 1179 ... datetime(2022, 1, 1, 12, 45, 57, 200), 1180 ... datetime.timedelta(hours=1), 1181 ... to = 'closest' 1182 ... ) 1183 datetime.datetime(2022, 1, 1, 13, 0) 1184 1185 """ 1186 from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP 1187 if date_delta is None: 1188 date_delta = timedelta(minutes=1) 1189 1190 if dt is None: 1191 dt = datetime.now(timezone.utc).replace(tzinfo=None) 1192 1193 def get_total_microseconds(td: timedelta) -> int: 1194 return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds 1195 1196 round_to_microseconds = get_total_microseconds(date_delta) 1197 if round_to_microseconds == 0: 1198 return dt 1199 1200 dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min 1201 dt_total_microseconds = get_total_microseconds(dt_delta_from_min) 1202 1203 dt_dec = Decimal(dt_total_microseconds) 1204 round_to_dec = Decimal(round_to_microseconds) 1205 1206 div = dt_dec / round_to_dec 1207 if to == 'down': 1208 num_intervals = div.to_integral_value(rounding=ROUND_DOWN) 1209 elif to == 'up': 1210 num_intervals = div.to_integral_value(rounding=ROUND_UP) 1211 else: 1212 num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP) 1213 1214 rounded_dt_total_microseconds = num_intervals * round_to_dec 1215 adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds 1216 1217 return dt + timedelta(microseconds=adjustment_microseconds)
92def to_pandas_dtype(dtype: str) -> str: 93 """ 94 Cast a supported Meerschaum dtype to a Pandas dtype. 95 """ 96 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 97 if known_dtype is not None: 98 return known_dtype 99 100 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 101 if alias_dtype is not None: 102 return MRSM_PD_DTYPES[alias_dtype] 103 104 if dtype.startswith('numeric'): 105 return MRSM_PD_DTYPES['numeric'] 106 107 if dtype.startswith('geometry'): 108 return MRSM_PD_DTYPES['geometry'] 109 110 if dtype.startswith('geography'): 111 return MRSM_PD_DTYPES['geography'] 112 113 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 114 ### treat it as a SQL db type. 115 if dtype.split(' ')[0].isupper(): 116 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 117 return get_pd_type_from_db_type(dtype) 118 119 from meerschaum.utils.packages import attempt_import 120 _ = attempt_import('pyarrow', lazy=False) 121 pandas = attempt_import('pandas', lazy=False) 122 123 try: 124 return str(pandas.api.types.pandas_dtype(dtype)) 125 except Exception: 126 warn( 127 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 128 + f"{traceback.format_exc()}", 129 stack=False, 130 ) 131 return 'object'
Cast a supported Meerschaum dtype to a Pandas dtype.
134def are_dtypes_equal( 135 ldtype: Union[str, Dict[str, str]], 136 rdtype: Union[str, Dict[str, str]], 137) -> bool: 138 """ 139 Determine whether two dtype strings may be considered 140 equivalent to avoid unnecessary conversions. 141 142 Parameters 143 ---------- 144 ldtype: Union[str, Dict[str, str]] 145 The left dtype to compare. 146 May also provide a dtypes dictionary. 147 148 rdtype: Union[str, Dict[str, str]] 149 The right dtype to compare. 150 May also provide a dtypes dictionary. 151 152 Returns 153 ------- 154 A `bool` indicating whether the two dtypes are to be considered equivalent. 155 """ 156 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 157 lkeys = sorted([str(k) for k in ldtype.keys()]) 158 rkeys = sorted([str(k) for k in rdtype.keys()]) 159 for lkey, rkey in zip(lkeys, rkeys): 160 if lkey != rkey: 161 return False 162 ltype = ldtype[lkey] 163 rtype = rdtype[rkey] 164 if not are_dtypes_equal(ltype, rtype): 165 return False 166 return True 167 168 try: 169 if ldtype == rdtype: 170 return True 171 except Exception: 172 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 173 return False 174 175 ### Sometimes pandas dtype objects are passed. 176 ldtype = str(ldtype).split('[', maxsplit=1)[0] 177 rdtype = str(rdtype).split('[', maxsplit=1)[0] 178 179 if ldtype in MRSM_ALIAS_DTYPES: 180 ldtype = MRSM_ALIAS_DTYPES[ldtype] 181 182 if rdtype in MRSM_ALIAS_DTYPES: 183 rdtype = MRSM_ALIAS_DTYPES[rdtype] 184 185 json_dtypes = ('json', 'object') 186 if ldtype in json_dtypes and rdtype in json_dtypes: 187 return True 188 189 numeric_dtypes = ('numeric', 'decimal', 'object') 190 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 191 return True 192 193 uuid_dtypes = ('uuid', 'object') 194 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 195 return True 196 197 bytes_dtypes = ('bytes', 'object', 'binary') 198 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 199 return True 200 201 geometry_dtypes = ('geometry', 'object', 'geography') 202 if ldtype in geometry_dtypes and rdtype in geometry_dtypes: 203 return True 204 205 if ldtype.lower() == rdtype.lower(): 206 return True 207 208 datetime_dtypes = ('datetime', 'timestamp') 209 ldtype_found_dt_prefix = False 210 rdtype_found_dt_prefix = False 211 for dt_prefix in datetime_dtypes: 212 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 213 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 214 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 215 return True 216 217 string_dtypes = ('str', 'string', 'object') 218 if ldtype in string_dtypes and rdtype in string_dtypes: 219 return True 220 221 int_dtypes = ( 222 'int', 'int64', 'int32', 'int16', 'int8', 223 'uint', 'uint64', 'uint32', 'uint16', 'uint8', 224 ) 225 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 226 return True 227 228 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 229 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 230 return True 231 232 bool_dtypes = ('bool', 'boolean') 233 if ldtype in bool_dtypes and rdtype in bool_dtypes: 234 return True 235 236 date_dtypes = ( 237 'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]', 238 'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]', 239 ) 240 if ldtype in date_dtypes and rdtype in date_dtypes: 241 return True 242 243 return False
Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.
Parameters
- ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
- rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
- A
bool
indicating whether the two dtypes are to be considered equivalent.
246def is_dtype_numeric(dtype: str) -> bool: 247 """ 248 Determine whether a given `dtype` string 249 should be considered compatible with the Meerschaum dtype `numeric`. 250 251 Parameters 252 ---------- 253 dtype: str 254 The pandas-like dtype string. 255 256 Returns 257 ------- 258 A bool indicating the dtype is compatible with `numeric`. 259 """ 260 dtype_lower = dtype.lower() 261 262 acceptable_substrings = ('numeric', 'float', 'double', 'int') 263 for substring in acceptable_substrings: 264 if substring in dtype_lower: 265 return True 266 267 return False
Determine whether a given dtype
string
should be considered compatible with the Meerschaum dtype numeric
.
Parameters
- dtype (str): The pandas-like dtype string.
Returns
- A bool indicating the dtype is compatible with
numeric
.
270def attempt_cast_to_numeric( 271 value: Any, 272 quantize: bool = False, 273 precision: Optional[int] = None, 274 scale: Optional[int] = None, 275)-> Any: 276 """ 277 Given a value, attempt to coerce it into a numeric (Decimal). 278 279 Parameters 280 ---------- 281 value: Any 282 The value to be cast to a Decimal. 283 284 quantize: bool, default False 285 If `True`, quantize the decimal to the specified precision and scale. 286 287 precision: Optional[int], default None 288 If `quantize` is `True`, use this precision. 289 290 scale: Optional[int], default None 291 If `quantize` is `True`, use this scale. 292 293 Returns 294 ------- 295 A `Decimal` if possible, or `value`. 296 """ 297 if isinstance(value, Decimal): 298 if quantize and precision and scale: 299 return quantize_decimal(value, precision, scale) 300 return value 301 try: 302 if value_is_null(value): 303 return Decimal('NaN') 304 305 dec = Decimal(str(value)) 306 if not quantize or not precision or not scale: 307 return dec 308 return quantize_decimal(dec, precision, scale) 309 except Exception: 310 return value
Given a value, attempt to coerce it into a numeric (Decimal).
Parameters
- value (Any): The value to be cast to a Decimal.
- quantize (bool, default False):
If
True
, quantize the decimal to the specified precision and scale. - precision (Optional[int], default None):
If
quantize
isTrue
, use this precision. - scale (Optional[int], default None):
If
quantize
isTrue
, use this scale.
Returns
- A
Decimal
if possible, orvalue
.
313def attempt_cast_to_uuid(value: Any) -> Any: 314 """ 315 Given a value, attempt to coerce it into a UUID (`uuid4`). 316 """ 317 if isinstance(value, uuid.UUID): 318 return value 319 try: 320 return ( 321 uuid.UUID(str(value)) 322 if not value_is_null(value) 323 else None 324 ) 325 except Exception: 326 return value
Given a value, attempt to coerce it into a UUID (uuid4
).
329def attempt_cast_to_bytes(value: Any) -> Any: 330 """ 331 Given a value, attempt to coerce it into a bytestring. 332 """ 333 if isinstance(value, bytes): 334 return value 335 try: 336 return ( 337 deserialize_bytes_string(str(value)) 338 if not value_is_null(value) 339 else None 340 ) 341 except Exception: 342 return value
Given a value, attempt to coerce it into a bytestring.
345def attempt_cast_to_geometry(value: Any) -> Any: 346 """ 347 Given a value, attempt to coerce it into a `shapely` (`geometry`) object. 348 """ 349 shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import( 350 'shapely', 351 'shapely.wkt', 352 'shapely.wkb', 353 lazy=False, 354 ) 355 if 'shapely' in str(type(value)): 356 return value 357 358 if isinstance(value, (dict, list)): 359 try: 360 return shapely.from_geojson(json.dumps(value)) 361 except Exception: 362 return value 363 364 value_is_wkt = geometry_is_wkt(value) 365 if value_is_wkt is None: 366 return value 367 368 try: 369 return ( 370 shapely_wkt.loads(value) 371 if value_is_wkt 372 else shapely_wkb.loads(value) 373 ) 374 except Exception: 375 return value
Given a value, attempt to coerce it into a shapely
(geometry
) object.
378def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]: 379 """ 380 Determine whether an input value should be treated as WKT or WKB geometry data. 381 382 Parameters 383 ---------- 384 value: Union[str, bytes] 385 The input data to be parsed into geometry data. 386 387 Returns 388 ------- 389 A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB). 390 Return `None` if `value` should be parsed as neither. 391 """ 392 import re 393 if not isinstance(value, (str, bytes)): 394 return None 395 396 if isinstance(value, bytes): 397 return False 398 399 wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$' 400 if re.match(wkt_pattern, value, re.IGNORECASE): 401 return True 402 403 if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0: 404 return False 405 406 return None
Determine whether an input value should be treated as WKT or WKB geometry data.
Parameters
- value (Union[str, bytes]): The input data to be parsed into geometry data.
Returns
- A
bool
(True
ifvalue
is WKT andFalse
if it should be treated as WKB). - Return
None
ifvalue
should be parsed as neither.
409def value_is_null(value: Any) -> bool: 410 """ 411 Determine if a value is a null-like string. 412 """ 413 return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')
Determine if a value is a null-like string.
416def none_if_null(value: Any) -> Any: 417 """ 418 Return `None` if a value is a null-like string. 419 """ 420 return (None if value_is_null(value) else value)
Return None
if a value is a null-like string.
423def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 424 """ 425 Quantize a given `Decimal` to a known scale and precision. 426 427 Parameters 428 ---------- 429 x: Decimal 430 The `Decimal` to be quantized. 431 432 precision: int 433 The total number of significant digits. 434 435 scale: int 436 The number of significant digits after the decimal point. 437 438 Returns 439 ------- 440 A `Decimal` quantized to the specified scale and precision. 441 """ 442 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 443 try: 444 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 445 except InvalidOperation: 446 pass 447 448 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
Quantize a given Decimal
to a known scale and precision.
Parameters
- x (Decimal):
The
Decimal
to be quantized. - precision (int): The total number of significant digits.
- scale (int): The number of significant digits after the decimal point.
Returns
- A
Decimal
quantized to the specified scale and precision.
451def serialize_decimal( 452 x: Any, 453 quantize: bool = False, 454 precision: Optional[int] = None, 455 scale: Optional[int] = None, 456) -> Any: 457 """ 458 Return a quantized string of an input decimal. 459 460 Parameters 461 ---------- 462 x: Any 463 The potential decimal to be serialized. 464 465 quantize: bool, default False 466 If `True`, quantize the incoming Decimal to the specified scale and precision 467 before serialization. 468 469 precision: Optional[int], default None 470 The precision of the decimal to be quantized. 471 472 scale: Optional[int], default None 473 The scale of the decimal to be quantized. 474 475 Returns 476 ------- 477 A string of the input decimal or the input if not a Decimal. 478 """ 479 if not isinstance(x, Decimal): 480 return x 481 482 if value_is_null(x): 483 return None 484 485 if quantize and scale and precision: 486 x = quantize_decimal(x, precision, scale) 487 488 return f"{x:f}"
Return a quantized string of an input decimal.
Parameters
- x (Any): The potential decimal to be serialized.
- quantize (bool, default False):
If
True
, quantize the incoming Decimal to the specified scale and precision before serialization. - precision (Optional[int], default None): The precision of the decimal to be quantized.
- scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
- A string of the input decimal or the input if not a Decimal.
491def coerce_timezone( 492 dt: Any, 493 strip_utc: bool = False, 494) -> Any: 495 """ 496 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 497 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 498 """ 499 if dt is None: 500 return None 501 502 if isinstance(dt, int): 503 return dt 504 505 if isinstance(dt, str): 506 dateutil_parser = mrsm.attempt_import('dateutil.parser') 507 try: 508 dt = dateutil_parser.parse(dt) 509 except Exception: 510 return dt 511 512 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 513 if dt_is_series: 514 pandas = mrsm.attempt_import('pandas', lazy=False) 515 516 if ( 517 pandas.api.types.is_datetime64_any_dtype(dt) and ( 518 (dt.dt.tz is not None and not strip_utc) 519 or 520 (dt.dt.tz is None and strip_utc) 521 ) 522 ): 523 return dt 524 525 dt_series = to_datetime(dt, coerce_utc=False) 526 if dt_series.dt.tz is None: 527 dt_series = dt_series.dt.tz_localize(timezone.utc) 528 if strip_utc: 529 try: 530 if dt_series.dt.tz is not None: 531 dt_series = dt_series.dt.tz_localize(None) 532 except Exception: 533 pass 534 535 return dt_series 536 537 if dt.tzinfo is None: 538 if strip_utc: 539 return dt 540 return dt.replace(tzinfo=timezone.utc) 541 542 utc_dt = dt.astimezone(timezone.utc) 543 if strip_utc: 544 return utc_dt.replace(tzinfo=None) 545 return utc_dt
Given a datetime
, pandas Timestamp
or Series
of Timestamp
,
return a UTC timestamp (strip timezone if strip_utc
is True
.
548def to_datetime( 549 dt_val: Any, 550 as_pydatetime: bool = False, 551 coerce_utc: bool = True, 552 precision_unit: Optional[str] = None, 553) -> Any: 554 """ 555 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 556 557 Parameters 558 ---------- 559 dt_val: Any 560 The value to coerce to Pandas Timestamps. 561 562 as_pydatetime: bool, default False 563 If `True`, return a Python datetime object. 564 565 coerce_utc: bool, default True 566 If `True`, ensure the value has UTC tzinfo. 567 568 precision_unit: Optional[str], default None 569 If provided, enforce the provided precision unit. 570 """ 571 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 572 is_dask = 'dask' in getattr(dt_val, '__module__', '') 573 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 574 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 575 pd = pandas if dd is None else dd 576 enforce_precision = precision_unit is not None 577 precision_unit = precision_unit or 'microsecond' 578 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 579 precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None) 580 if not precision_abbreviation: 581 raise ValueError(f"Invalid precision '{precision_unit}'.") 582 583 def parse(x: Any) -> Any: 584 try: 585 return dateutil_parser.parse(x) 586 except Exception: 587 return x 588 589 def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool: 590 dtype_check_against = ( 591 f"datetime64[{precision_abbreviation}, UTC]" 592 if with_utc 593 else f"datetime64[{precision_abbreviation}]" 594 ) 595 return ( 596 dtype_to_check == dtype_check_against 597 if enforce_precision 598 else ( 599 dtype_to_check.startswith('datetime64[') 600 and ( 601 ('utc' in dtype_to_check.lower()) 602 if with_utc 603 else ('utc' not in dtype_to_check.lower()) 604 ) 605 ) 606 ) 607 608 if isinstance(dt_val, pd.Timestamp): 609 dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime() 610 return ( 611 coerce_timezone(dt_val_to_return) 612 if coerce_utc 613 else dt_val_to_return 614 ) 615 616 if dt_is_series: 617 changed_tz = False 618 original_tz = None 619 dtype = str(getattr(dt_val, 'dtype', 'object')) 620 if ( 621 are_dtypes_equal(dtype, 'datetime') 622 and 'utc' not in dtype.lower() 623 and hasattr(dt_val, 'dt') 624 ): 625 original_tz = dt_val.dt.tz 626 dt_val = dt_val.dt.tz_localize(timezone.utc) 627 changed_tz = True 628 dtype = str(getattr(dt_val, 'dtype', 'object')) 629 try: 630 new_dt_series = ( 631 dt_val 632 if check_dtype(dtype, with_utc=True) 633 else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]") 634 ) 635 except pd.errors.OutOfBoundsDatetime: 636 try: 637 next_precision = get_next_precision_unit(true_precision_unit) 638 next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision] 639 new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]") 640 except Exception: 641 new_dt_series = None 642 except ValueError: 643 new_dt_series = None 644 except TypeError: 645 try: 646 new_dt_series = ( 647 new_dt_series 648 if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False) 649 else dt_val.astype(f"datetime64[{precision_abbreviation}]") 650 ) 651 except Exception: 652 new_dt_series = None 653 654 if new_dt_series is None: 655 new_dt_series = dt_val.apply(lambda x: parse(str(x))) 656 657 if coerce_utc: 658 return coerce_timezone(new_dt_series) 659 660 if changed_tz: 661 new_dt_series = new_dt_series.dt.tz_localize(original_tz) 662 return new_dt_series 663 664 try: 665 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 666 if new_dt_val.unit != precision_abbreviation: 667 new_dt_val = new_dt_val.as_unit(precision_abbreviation) 668 if as_pydatetime: 669 return new_dt_val.to_pydatetime() 670 return new_dt_val 671 except (pd.errors.OutOfBoundsDatetime, ValueError): 672 pass 673 674 new_dt_val = parse(dt_val) 675 if not coerce_utc: 676 return new_dt_val 677 return coerce_timezone(new_dt_val)
Wrap pd.to_datetime()
and add support for out-of-bounds values.
Parameters
- dt_val (Any): The value to coerce to Pandas Timestamps.
- as_pydatetime (bool, default False):
If
True
, return a Python datetime object. - coerce_utc (bool, default True):
If
True
, ensure the value has UTC tzinfo. - precision_unit (Optional[str], default None): If provided, enforce the provided precision unit.
680def serialize_bytes(data: bytes) -> str: 681 """ 682 Return the given bytes as a base64-encoded string. 683 """ 684 import base64 685 if not isinstance(data, bytes) and value_is_null(data): 686 return data 687 return base64.b64encode(data).decode('utf-8')
Return the given bytes as a base64-encoded string.
690def serialize_geometry( 691 geom: Any, 692 geometry_format: str = 'wkb_hex', 693 srid: Optional[int] = None, 694) -> Union[str, Dict[str, Any], None]: 695 """ 696 Serialize geometry data as a hex-encoded well-known-binary string. 697 698 Parameters 699 ---------- 700 geom: Any 701 The potential geometry data to be serialized. 702 703 geometry_format: str, default 'wkb_hex' 704 The serialization format for geometry data. 705 Accepted formats are `wkb_hex` (well-known binary hex string), 706 `wkt` (well-known text), and `geojson`. 707 708 srid: Optional[int], default None 709 If provided, use this as the source CRS when serializing to GeoJSON. 710 711 Returns 712 ------- 713 A string containing the geometry data. 714 """ 715 if value_is_null(geom): 716 return None 717 shapely, shapely_ops, pyproj = mrsm.attempt_import( 718 'shapely', 'shapely.ops', 'pyproj', 719 lazy=False, 720 ) 721 if geometry_format == 'geojson': 722 if srid: 723 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True) 724 geom = shapely_ops.transform(transformer.transform, geom) 725 geojson_str = shapely.to_geojson(geom) 726 return json.loads(geojson_str) 727 728 if hasattr(geom, 'wkb_hex'): 729 if geometry_format == "wkb_hex": 730 return shapely.to_wkb(geom, hex=True, include_srid=True) 731 return shapely.to_wkt(geom) 732 733 return str(geom)
Serialize geometry data as a hex-encoded well-known-binary string.
Parameters
- geom (Any): The potential geometry data to be serialized.
- geometry_format (str, default 'wkb_hex'):
The serialization format for geometry data.
Accepted formats are
wkb_hex
(well-known binary hex string),wkt
(well-known text), andgeojson
. - srid (Optional[int], default None): If provided, use this as the source CRS when serializing to GeoJSON.
Returns
- A string containing the geometry data.
736def deserialize_geometry(geom_wkb: Union[str, bytes]): 737 """ 738 Deserialize a WKB string into a shapely geometry object. 739 """ 740 shapely = mrsm.attempt_import('shapely', lazy=False) 741 return shapely.wkb.loads(geom_wkb)
Deserialize a WKB string into a shapely geometry object.
744def project_geometry(geom, srid: int, to_srid: int = 4326): 745 """ 746 Project a shapely geometry object to a new CRS (SRID). 747 """ 748 pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False) 749 transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True) 750 return shapely_ops.transform(transformer.transform, geom)
Project a shapely geometry object to a new CRS (SRID).
753def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]: 754 """ 755 Given a serialized ASCII string of bytes data, return the original bytes. 756 The input data may either be base64- or hex-encoded. 757 758 Parameters 759 ---------- 760 data: Optional[str] 761 The string to be deserialized into bytes. 762 May be base64- or hex-encoded (prefixed with `'\\x'`). 763 764 force_hex: bool = False 765 If `True`, treat the input string as hex-encoded. 766 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 767 This will still strip the leading `'\\x'` prefix if present. 768 769 Returns 770 ------- 771 The original bytes used to produce the encoded string `data`. 772 """ 773 if not isinstance(data, str) and value_is_null(data): 774 return data 775 776 import binascii 777 import base64 778 779 is_hex = force_hex or data.startswith('\\x') 780 781 if is_hex: 782 if data.startswith('\\x'): 783 data = data[2:] 784 return binascii.unhexlify(data) 785 786 return base64.b64decode(data)
Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.
Parameters
- data (Optional[str]):
The string to be deserialized into bytes.
May be base64- or hex-encoded (prefixed with
'\x'
). - force_hex (bool = False):
If
True
, treat the input string as hex-encoded. Ifdata
does not begin with the prefix'\x'
, setforce_hex
toTrue
. This will still strip the leading'\x'
prefix if present.
Returns
- The original bytes used to produce the encoded string
data
.
789def deserialize_base64(data: str) -> bytes: 790 """ 791 Return the original bytestring from the given base64-encoded string. 792 """ 793 import base64 794 return base64.b64decode(data)
Return the original bytestring from the given base64-encoded string.
797def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]: 798 """ 799 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 800 """ 801 import binascii 802 if not isinstance(data, bytes) and value_is_null(data): 803 return data 804 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
Return the given bytes as a hex string for PostgreSQL's BYTEA
type.
807def serialize_datetime(dt: datetime) -> Union[str, None]: 808 """ 809 Serialize a datetime object into JSON (ISO format string). 810 811 Examples 812 -------- 813 >>> import json 814 >>> from datetime import datetime 815 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 816 '{"a": "2022-01-01T00:00:00Z"}' 817 818 """ 819 if not hasattr(dt, 'isoformat'): 820 return None 821 822 tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else '' 823 return dt.isoformat() + tz_suffix
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
826def serialize_date(d: date) -> Union[str, None]: 827 """ 828 Serialize a date object into its ISO representation. 829 """ 830 return d.isoformat() if hasattr(d, 'isoformat') else None
Serialize a date object into its ISO representation.
833def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]: 834 """ 835 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 836 837 Parameters 838 ---------- 839 x: Any 840 The value to serialize. 841 842 default_to_str: bool, default True 843 If `True`, return a string of `x` if x is not a designated type. 844 Otherwise return x. 845 846 Returns 847 ------- 848 A serialized version of x, or x. 849 """ 850 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 851 return x.meta 852 853 if hasattr(x, 'tzinfo'): 854 return serialize_datetime(x) 855 856 if hasattr(x, 'isoformat'): 857 return serialize_date(x) 858 859 if isinstance(x, bytes): 860 return serialize_bytes(x) 861 862 if isinstance(x, Decimal): 863 return serialize_decimal(x) 864 865 if 'shapely' in str(type(x)): 866 return serialize_geometry(x) 867 868 if value_is_null(x): 869 return None 870 871 if isinstance(x, (dict, list, tuple)): 872 return json.dumps(x, default=json_serialize_value, separators=(',', ':')) 873 874 return str(x) if default_to_str else x
Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
Parameters
- x (Any): The value to serialize.
- default_to_str (bool, default True):
If
True
, return a string ofx
if x is not a designated type. Otherwise return x.
Returns
- A serialized version of x, or x.
877def get_geometry_type_srid( 878 dtype: str = 'geometry', 879 default_type: str = 'geometry', 880 default_srid: int = 4326, 881) -> Union[Tuple[str, int], Tuple[str, None]]: 882 """ 883 Given the specified geometry `dtype`, return a tuple in the form (type, SRID). 884 885 Parameters 886 ---------- 887 dtype: Optional[str], default None 888 Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`). 889 You may specify a supported `shapely` geometry type and an SRID in the dtype modifier: 890 891 - `Point` 892 - `LineString` 893 - `LinearRing` 894 - `Polygon` 895 - `MultiPoint` 896 - `MultiLineString` 897 - `MultiPolygon` 898 - `GeometryCollection` 899 900 Returns 901 ------- 902 A tuple in the form (type, SRID). 903 Defaults to `(default_type, default_srid)`. 904 905 Examples 906 -------- 907 >>> from meerschaum.utils.dtypes import get_geometry_type_srid 908 >>> get_geometry_type_srid() 909 ('geometry', 4326) 910 >>> get_geometry_type_srid('geometry[]') 911 ('geometry', 4326) 912 >>> get_geometry_type_srid('geometry[Point, 0]') 913 ('Point', 0) 914 >>> get_geometry_type_srid('geometry[0, Point]') 915 ('Point', 0) 916 >>> get_geometry_type_srid('geometry[0]') 917 ('geometry', 0) 918 >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]') 919 ('MultiLineString', 4326) 920 >>> get_geometry_type_srid('geography') 921 ('geometry', 4326) 922 >>> get_geometry_type_srid('geography[POINT]') 923 ('Point', 4376) 924 """ 925 from meerschaum.utils.misc import is_int 926 ### NOTE: PostGIS syntax must also be parsed. 927 dtype = dtype.replace('(', '[').replace(')', ']') 928 bare_dtype = dtype.split('[', maxsplit=1)[0] 929 modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']') 930 if not modifier: 931 return default_type, default_srid 932 933 parts = [ 934 part.split('=')[-1].strip() 935 for part in modifier.split(',') 936 ] 937 parts_casted = [ 938 ( 939 int(part) 940 if is_int(part) 941 else part 942 ) 943 for part in parts 944 ] 945 946 srid = default_srid 947 geometry_type = default_type 948 949 for part in parts_casted: 950 if isinstance(part, int): 951 srid = part 952 break 953 954 for part in parts_casted: 955 if isinstance(part, str): 956 geometry_type = part 957 break 958 959 return geometry_type, srid
Given the specified geometry dtype
, return a tuple in the form (type, SRID).
Parameters
dtype (Optional[str], default None): Optionally provide a specific
geometry
syntax (e.g.geometry[MultiLineString, 4326]
). You may specify a supportedshapely
geometry type and an SRID in the dtype modifier:Point
LineString
LinearRing
Polygon
MultiPoint
MultiLineString
MultiPolygon
GeometryCollection
Returns
- A tuple in the form (type, SRID).
- Defaults to
(default_type, default_srid)
.
Examples
>>> from meerschaum.utils.dtypes import get_geometry_type_srid
>>> get_geometry_type_srid()
('geometry', 4326)
>>> get_geometry_type_srid('geometry[]')
('geometry', 4326)
>>> get_geometry_type_srid('geometry[Point, 0]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0, Point]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0]')
('geometry', 0)
>>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
('MultiLineString', 4326)
>>> get_geometry_type_srid('geography')
('geometry', 4326)
>>> get_geometry_type_srid('geography[POINT]')
('Point', 4376)
962def get_current_timestamp( 963 precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'], 964 precision_interval: int = 1, 965 round_to: str = 'down', 966 as_pandas: bool = False, 967 as_int: bool = False, 968 _now: Union[datetime, int, None] = None, 969) -> 'Union[datetime, pd.Timestamp, int]': 970 """ 971 Return the current UTC timestamp to nanosecond precision. 972 973 Parameters 974 ---------- 975 precision_unit: str, default 'us' 976 The precision of the timestamp to be returned. 977 Valid values are the following: 978 - `ns` / `nanosecond` 979 - `us` / `microsecond` 980 - `ms` / `millisecond` 981 - `s` / `sec` / `second` 982 - `m` / `min` / `minute` 983 - `h` / `hr` / `hour` 984 - `d` / `day` 985 986 precision_interval: int, default 1 987 Round the timestamp to the `precision_interval` units. 988 For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals. 989 Note: `precision_interval` must be 1 when `precision='nanosecond'`. 990 991 round_to: str, default 'down' 992 The direction to which to round the timestamp. 993 Available options are `down`, `up`, and `closest`. 994 995 as_pandas: bool, default False 996 If `True`, return a Pandas Timestamp. 997 This is always true if `unit` is `nanosecond`. 998 999 as_int: bool, default False 1000 If `True`, return the timestamp to an integer. 1001 Overrides `as_pandas`. 1002 1003 Returns 1004 ------- 1005 A Pandas Timestamp, datetime object, or integer with precision to the provided unit. 1006 1007 Examples 1008 -------- 1009 >>> get_current_timestamp('ns') 1010 Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC') 1011 >>> get_current_timestamp('ms') 1012 Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC') 1013 """ 1014 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1015 if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS: 1016 from meerschaum.utils.misc import items_str 1017 raise ValueError( 1018 f"Unknown precision unit '{precision_unit}'. " 1019 "Accepted values are " 1020 f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}." 1021 ) 1022 1023 if not as_int: 1024 as_pandas = as_pandas or true_precision_unit == 'nanosecond' 1025 pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None 1026 1027 if true_precision_unit == 'nanosecond': 1028 if precision_interval != 1: 1029 warn("`precision_interval` must be 1 for nanosecond precision.") 1030 now_ts = time.time_ns() if not isinstance(_now, int) else _now 1031 if as_int: 1032 return now_ts 1033 return pd.to_datetime(now_ts, unit='ns', utc=True) 1034 1035 now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now 1036 delta = timedelta(**{true_precision_unit + 's': precision_interval}) 1037 rounded_now = round_time(now, delta, to=round_to) 1038 1039 if as_int: 1040 return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit]) 1041 1042 ts_val = ( 1043 pd.to_datetime(rounded_now, utc=True) 1044 if as_pandas 1045 else rounded_now 1046 ) 1047 1048 if not as_pandas: 1049 return ts_val 1050 1051 as_unit_precisions = ('microsecond', 'millisecond', 'second') 1052 if true_precision_unit not in as_unit_precisions: 1053 return ts_val 1054 1055 return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])
Return the current UTC timestamp to nanosecond precision.
Parameters
- precision_unit (str, default 'us'):
The precision of the timestamp to be returned.
Valid values are the following:
-
ns
/nanosecond
-us
/microsecond
-ms
/millisecond
-s
/sec
/second
-m
/min
/minute
-h
/hr
/hour
-d
/day
- precision_interval (int, default 1):
Round the timestamp to the
precision_interval
units. For example,precision='minute'
andprecision_interval=15
will round to 15-minute intervals. Note:precision_interval
must be 1 whenprecision='nanosecond'
. - round_to (str, default 'down'):
The direction to which to round the timestamp.
Available options are
down
,up
, andclosest
. - as_pandas (bool, default False):
If
True
, return a Pandas Timestamp. This is always true ifunit
isnanosecond
. - as_int (bool, default False):
If
True
, return the timestamp to an integer. Overridesas_pandas
.
Returns
- A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
Examples
>>> get_current_timestamp('ns')
Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
>>> get_current_timestamp('ms')
Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1058def is_dtype_special(type_: str) -> bool: 1059 """ 1060 Return whether a dtype should be treated as a special Meerschaum dtype. 1061 This is not the same as a Meerschaum alias. 1062 """ 1063 true_type = MRSM_ALIAS_DTYPES.get(type_, type_) 1064 if true_type in ( 1065 'uuid', 1066 'json', 1067 'bytes', 1068 'numeric', 1069 'datetime', 1070 'geometry', 1071 'geography', 1072 'date', 1073 'bool', 1074 ): 1075 return True 1076 1077 if are_dtypes_equal(true_type, 'datetime'): 1078 return True 1079 1080 if are_dtypes_equal(true_type, 'date'): 1081 return True 1082 1083 if true_type.startswith('numeric'): 1084 return True 1085 1086 if true_type.startswith('bool'): 1087 return True 1088 1089 if true_type.startswith('geometry'): 1090 return True 1091 1092 if true_type.startswith('geography'): 1093 return True 1094 1095 return False
Return whether a dtype should be treated as a special Meerschaum dtype. This is not the same as a Meerschaum alias.
1098def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str: 1099 """ 1100 Get the next precision string in order of value. 1101 1102 Parameters 1103 ---------- 1104 precision_unit: str 1105 The precision string (`'nanosecond'`, `'ms'`, etc.). 1106 1107 decrease: bool, defaul True 1108 If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`). 1109 If `False`, return the precision unit which is higher. 1110 1111 Returns 1112 ------- 1113 A `precision` string which is lower or higher than the given precision unit. 1114 1115 Examples 1116 -------- 1117 >>> get_next_precision_unit('nanosecond') 1118 'microsecond' 1119 >>> get_next_precision_unit('ms') 1120 'second' 1121 >>> get_next_precision_unit('hour', decrease=False) 1122 'minute' 1123 """ 1124 true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 1125 precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None) 1126 if not precision_scalar: 1127 raise ValueError(f"Invalid precision unit '{precision_unit}'.") 1128 1129 precisions = sorted( 1130 list(MRSM_PRECISION_UNITS_SCALARS), 1131 key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p] 1132 ) 1133 1134 precision_index = precisions.index(true_precision_unit) 1135 new_precision_index = precision_index + (-1 if decrease else 1) 1136 if new_precision_index < 0 or new_precision_index >= len(precisions): 1137 raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.") 1138 1139 return precisions[new_precision_index]
Get the next precision string in order of value.
Parameters
- precision_unit (str):
The precision string (
'nanosecond'
,'ms'
, etc.). - decrease (bool, defaul True):
If
True
return the precision unit which is lower (e.g.nanosecond
->millisecond
). IfFalse
, return the precision unit which is higher.
Returns
- A
precision
string which is lower or higher than the given precision unit.
Examples
>>> get_next_precision_unit('nanosecond')
'microsecond'
>>> get_next_precision_unit('ms')
'second'
>>> get_next_precision_unit('hour', decrease=False)
'minute'
1142def round_time( 1143 dt: Optional[datetime] = None, 1144 date_delta: Optional[timedelta] = None, 1145 to: 'str' = 'down' 1146) -> datetime: 1147 """ 1148 Round a datetime object to a multiple of a timedelta. 1149 1150 Parameters 1151 ---------- 1152 dt: Optional[datetime], default None 1153 If `None`, grab the current UTC datetime. 1154 1155 date_delta: Optional[timedelta], default None 1156 If `None`, use a delta of 1 minute. 1157 1158 to: 'str', default 'down' 1159 Available options are `'up'`, `'down'`, and `'closest'`. 1160 1161 Returns 1162 ------- 1163 A rounded `datetime` object. 1164 1165 Examples 1166 -------- 1167 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 1168 datetime.datetime(2022, 1, 1, 12, 15) 1169 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 1170 datetime.datetime(2022, 1, 1, 12, 16) 1171 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 1172 datetime.datetime(2022, 1, 1, 12, 0) 1173 >>> round_time( 1174 ... datetime(2022, 1, 1, 12, 15, 57, 200), 1175 ... timedelta(hours=1), 1176 ... to = 'closest' 1177 ... ) 1178 datetime.datetime(2022, 1, 1, 12, 0) 1179 >>> round_time( 1180 ... datetime(2022, 1, 1, 12, 45, 57, 200), 1181 ... datetime.timedelta(hours=1), 1182 ... to = 'closest' 1183 ... ) 1184 datetime.datetime(2022, 1, 1, 13, 0) 1185 1186 """ 1187 from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP 1188 if date_delta is None: 1189 date_delta = timedelta(minutes=1) 1190 1191 if dt is None: 1192 dt = datetime.now(timezone.utc).replace(tzinfo=None) 1193 1194 def get_total_microseconds(td: timedelta) -> int: 1195 return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds 1196 1197 round_to_microseconds = get_total_microseconds(date_delta) 1198 if round_to_microseconds == 0: 1199 return dt 1200 1201 dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min 1202 dt_total_microseconds = get_total_microseconds(dt_delta_from_min) 1203 1204 dt_dec = Decimal(dt_total_microseconds) 1205 round_to_dec = Decimal(round_to_microseconds) 1206 1207 div = dt_dec / round_to_dec 1208 if to == 'down': 1209 num_intervals = div.to_integral_value(rounding=ROUND_DOWN) 1210 elif to == 'up': 1211 num_intervals = div.to_integral_value(rounding=ROUND_UP) 1212 else: 1213 num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP) 1214 1215 rounded_dt_total_microseconds = num_intervals * round_to_dec 1216 adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds 1217 1218 return dt + timedelta(microseconds=adjustment_microseconds)
Round a datetime object to a multiple of a timedelta.
Parameters
- dt (Optional[datetime], default None):
If
None
, grab the current UTC datetime. - date_delta (Optional[timedelta], default None):
If
None
, use a delta of 1 minute. - to ('str', default 'down'):
Available options are
'up'
,'down'
, and'closest'
.
Returns
- A rounded
datetime
object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 15, 57, 200),
... timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 45, 57, 200),
... datetime.timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)