meerschaum.utils.dtypes

Utility functions for working with data types.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with data types.
   7"""
   8
   9import traceback
  10import json
  11import uuid
  12import time
  13import struct
  14from datetime import timezone, datetime, date, timedelta
  15from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP
  16
  17import meerschaum as mrsm
  18from meerschaum.utils.typing import Dict, Union, Any, Optional, Tuple
  19from meerschaum.utils.warnings import warn
  20from meerschaum._internal.static import STATIC_CONFIG as _STATIC_CONFIG
  21
  22MRSM_ALIAS_DTYPES: Dict[str, str] = {
  23    'decimal': 'numeric',
  24    'Decimal': 'numeric',
  25    'number': 'numeric',
  26    'jsonl': 'json',
  27    'JSON': 'json',
  28    'binary': 'bytes',
  29    'blob': 'bytes',
  30    'varbinary': 'bytes',
  31    'bytea': 'bytes',
  32    'guid': 'uuid',
  33    'UUID': 'uuid',
  34    'geom': 'geometry',
  35    'geog': 'geography',
  36    'boolean': 'bool',
  37    'day': 'date',
  38}
  39MRSM_PD_DTYPES: Dict[Union[str, None], str] = {
  40    'json': 'object',
  41    'numeric': 'object',
  42    'geometry': 'object',
  43    'geography': 'object',
  44    'uuid': 'object',
  45    'date': 'date32[day][pyarrow]',
  46    'datetime': 'datetime64[us, UTC]',
  47    'bool': 'bool[pyarrow]',
  48    'int': 'int64[pyarrow]',
  49    'int8': 'int8[pyarrow]',
  50    'int16': 'int16[pyarrow]',
  51    'int32': 'int32[pyarrow]',
  52    'int64': 'int64[pyarrow]',
  53    'str': 'string',
  54    'bytes': 'binary[pyarrow]',
  55    None: 'object',
  56}
  57
  58MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = {
  59    'nanosecond': 1_000_000_000,
  60    'microsecond': 1_000_000,
  61    'millisecond': 1000,
  62    'second': 1,
  63    'minute': (1 / 60),
  64    'hour': (1 / 3600),
  65    'day': (1 / 86400),
  66}
  67
  68MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = {
  69    'ns': 'nanosecond',
  70    'us': 'microsecond',
  71    'ms': 'millisecond',
  72    's': 'second',
  73    'sec': 'second',
  74    'm': 'minute',
  75    'min': 'minute',
  76    'h': 'hour',
  77    'hr': 'hour',
  78    'd': 'day',
  79    'D': 'day',
  80}
  81MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = {
  82    'nanosecond': 'ns',
  83    'microsecond': 'us',
  84    'millisecond': 'ms',
  85    'second': 's',
  86    'minute': 'min',
  87    'hour': 'hr',
  88    'day': 'D',
  89}
  90
  91
  92def to_pandas_dtype(dtype: str) -> str:
  93    """
  94    Cast a supported Meerschaum dtype to a Pandas dtype.
  95    """
  96    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
  97    if known_dtype is not None:
  98        return known_dtype
  99
 100    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
 101    if alias_dtype is not None:
 102        return MRSM_PD_DTYPES[alias_dtype]
 103
 104    if dtype.startswith('numeric'):
 105        return MRSM_PD_DTYPES['numeric']
 106
 107    if dtype.startswith('geometry'):
 108        return MRSM_PD_DTYPES['geometry']
 109
 110    if dtype.startswith('geography'):
 111        return MRSM_PD_DTYPES['geography']
 112
 113    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
 114    ### treat it as a SQL db type.
 115    if dtype.split(' ')[0].isupper():
 116        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
 117        return get_pd_type_from_db_type(dtype)
 118
 119    from meerschaum.utils.packages import attempt_import
 120    _ = attempt_import('pyarrow', lazy=False)
 121    pandas = attempt_import('pandas', lazy=False)
 122
 123    try:
 124        return str(pandas.api.types.pandas_dtype(dtype))
 125    except Exception:
 126        warn(
 127            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
 128            + f"{traceback.format_exc()}",
 129            stack=False,
 130        )
 131    return 'object'
 132
 133
 134def are_dtypes_equal(
 135    ldtype: Union[str, Dict[str, str]],
 136    rdtype: Union[str, Dict[str, str]],
 137) -> bool:
 138    """
 139    Determine whether two dtype strings may be considered
 140    equivalent to avoid unnecessary conversions.
 141
 142    Parameters
 143    ----------
 144    ldtype: Union[str, Dict[str, str]]
 145        The left dtype to compare.
 146        May also provide a dtypes dictionary.
 147
 148    rdtype: Union[str, Dict[str, str]]
 149        The right dtype to compare.
 150        May also provide a dtypes dictionary.
 151
 152    Returns
 153    -------
 154    A `bool` indicating whether the two dtypes are to be considered equivalent.
 155    """
 156    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
 157        lkeys = sorted([str(k) for k in ldtype.keys()])
 158        rkeys = sorted([str(k) for k in rdtype.keys()])
 159        for lkey, rkey in zip(lkeys, rkeys):
 160            if lkey != rkey:
 161                return False
 162            ltype = ldtype[lkey]
 163            rtype = rdtype[rkey]
 164            if not are_dtypes_equal(ltype, rtype):
 165                return False
 166        return True
 167
 168    try:
 169        if ldtype == rdtype:
 170            return True
 171    except Exception:
 172        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
 173        return False
 174
 175    ### Sometimes pandas dtype objects are passed.
 176    ldtype = str(ldtype).split('[', maxsplit=1)[0]
 177    rdtype = str(rdtype).split('[', maxsplit=1)[0]
 178
 179    if ldtype in MRSM_ALIAS_DTYPES:
 180        ldtype = MRSM_ALIAS_DTYPES[ldtype]
 181
 182    if rdtype in MRSM_ALIAS_DTYPES:
 183        rdtype = MRSM_ALIAS_DTYPES[rdtype]
 184
 185    json_dtypes = ('json', 'object')
 186    if ldtype in json_dtypes and rdtype in json_dtypes:
 187        return True
 188
 189    numeric_dtypes = ('numeric', 'decimal', 'object')
 190    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
 191        return True
 192
 193    uuid_dtypes = ('uuid', 'object')
 194    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
 195        return True
 196
 197    bytes_dtypes = ('bytes', 'object', 'binary')
 198    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
 199        return True
 200
 201    geometry_dtypes = ('geometry', 'object', 'geography')
 202    if ldtype in geometry_dtypes and rdtype in geometry_dtypes:
 203        return True
 204
 205    if ldtype.lower() == rdtype.lower():
 206        return True
 207
 208    datetime_dtypes = ('datetime', 'timestamp')
 209    ldtype_found_dt_prefix = False
 210    rdtype_found_dt_prefix = False
 211    for dt_prefix in datetime_dtypes:
 212        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
 213        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
 214    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
 215        return True
 216
 217    string_dtypes = ('str', 'string', 'object')
 218    if ldtype in string_dtypes and rdtype in string_dtypes:
 219        return True
 220
 221    int_dtypes = (
 222        'int', 'int64', 'int32', 'int16', 'int8',
 223        'uint', 'uint64', 'uint32', 'uint16', 'uint8',
 224    )
 225    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
 226        return True
 227
 228    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
 229    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
 230        return True
 231
 232    bool_dtypes = ('bool', 'boolean')
 233    if ldtype in bool_dtypes and rdtype in bool_dtypes:
 234        return True
 235
 236    date_dtypes = (
 237        'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]',
 238        'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]',
 239    )
 240    if ldtype in date_dtypes and rdtype in date_dtypes:
 241        return True
 242
 243    return False
 244
 245
 246def is_dtype_numeric(dtype: str) -> bool:
 247    """
 248    Determine whether a given `dtype` string
 249    should be considered compatible with the Meerschaum dtype `numeric`.
 250
 251    Parameters
 252    ----------
 253    dtype: str
 254        The pandas-like dtype string.
 255
 256    Returns
 257    -------
 258    A bool indicating the dtype is compatible with `numeric`.
 259    """
 260    dtype_lower = dtype.lower()
 261
 262    acceptable_substrings = ('numeric', 'float', 'double', 'int')
 263    for substring in acceptable_substrings:
 264        if substring in dtype_lower:
 265            return True
 266
 267    return False
 268
 269
 270def attempt_cast_to_numeric(
 271    value: Any,
 272    quantize: bool = False,
 273    precision: Optional[int] = None,
 274    scale: Optional[int] = None,
 275)-> Any:
 276    """
 277    Given a value, attempt to coerce it into a numeric (Decimal).
 278
 279    Parameters
 280    ----------
 281    value: Any
 282        The value to be cast to a Decimal.
 283
 284    quantize: bool, default False
 285        If `True`, quantize the decimal to the specified precision and scale.
 286
 287    precision: Optional[int], default None
 288        If `quantize` is `True`, use this precision.
 289
 290    scale: Optional[int], default None
 291        If `quantize` is `True`, use this scale.
 292
 293    Returns
 294    -------
 295    A `Decimal` if possible, or `value`.
 296    """
 297    if isinstance(value, Decimal):
 298        if quantize and precision and scale:
 299            return quantize_decimal(value, precision, scale)
 300        return value
 301    try:
 302        if value_is_null(value):
 303            return Decimal('NaN')
 304
 305        dec = Decimal(str(value))
 306        if not quantize or not precision or not scale:
 307            return dec
 308        return quantize_decimal(dec, precision, scale)
 309    except Exception:
 310        return value
 311
 312
 313def attempt_cast_to_uuid(value: Any) -> Any:
 314    """
 315    Given a value, attempt to coerce it into a UUID (`uuid4`).
 316    """
 317    if isinstance(value, uuid.UUID):
 318        return value
 319    try:
 320        return (
 321            uuid.UUID(str(value))
 322            if not value_is_null(value)
 323            else None
 324        )
 325    except Exception:
 326        return value
 327
 328
 329def attempt_cast_to_bytes(value: Any) -> Any:
 330    """
 331    Given a value, attempt to coerce it into a bytestring.
 332    """
 333    if isinstance(value, bytes):
 334        return value
 335    try:
 336        return (
 337            deserialize_bytes_string(str(value))
 338            if not value_is_null(value)
 339            else None
 340        )
 341    except Exception:
 342        return value
 343
 344
 345def attempt_cast_to_geometry(value: Any) -> Any:
 346    """
 347    Given a value, attempt to coerce it into a `shapely` (`geometry`) object.
 348    """
 349    shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import(
 350        'shapely',
 351        'shapely.wkt',
 352        'shapely.wkb',
 353        lazy=False,
 354    )
 355    if 'shapely' in str(type(value)):
 356        return value
 357
 358    if isinstance(value, (dict, list)):
 359        try:
 360            return shapely.from_geojson(json.dumps(value))
 361        except Exception:
 362            return value
 363
 364    value_is_gpkg = geometry_is_gpkg(value)
 365    if value_is_gpkg:
 366        try:
 367            wkb_data, _, _ = gpkg_wkb_to_wkb(value)
 368            return shapely_wkb.loads(wkb_data)
 369        except Exception:
 370            return value
 371
 372    value_is_wkt = geometry_is_wkt(value)
 373    if value_is_wkt is None:
 374        return value
 375
 376    try:
 377        return (
 378            shapely_wkt.loads(value)
 379            if value_is_wkt
 380            else shapely_wkb.loads(value)
 381        )
 382    except Exception:
 383        return value
 384
 385
 386def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]:
 387    """
 388    Determine whether an input value should be treated as WKT or WKB geometry data.
 389
 390    Parameters
 391    ----------
 392    value: Union[str, bytes]
 393        The input data to be parsed into geometry data.
 394
 395    Returns
 396    -------
 397    A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB).
 398    Return `None` if `value` should be parsed as neither.
 399    """
 400    import re
 401    if not isinstance(value, (str, bytes)):
 402        return None
 403
 404    if isinstance(value, bytes):
 405        return False
 406    
 407    wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$'
 408    if re.match(wkt_pattern, value, re.IGNORECASE):
 409        return True
 410    
 411    if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0:
 412        return False
 413    
 414    return None
 415
 416
 417def geometry_is_gpkg(value: bytes) -> bool:
 418    """
 419    Return whether the input `value` is formatted as GeoPackage WKB.
 420    """
 421    if not isinstance(value, bytes) or len(value) < 2:
 422        return False
 423
 424    return value[0:2] == b'GP'
 425
 426def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]:
 427    """
 428    Converts GeoPackage WKB to standard WKB by removing the header.
 429    
 430    Parameters
 431    ----------
 432    gpkg_wkb_bytes: bytes
 433        The GeoPackage WKB byte string.
 434        
 435    Returns
 436    -------
 437    A tuple containing the standard WKB bytes, SRID, and flags.
 438    """
 439    magic_number = gpkg_wkb_bytes[0:2]
 440    if magic_number != b'GP':
 441        raise ValueError("Invalid GeoPackage WKB header: missing magic number.")
 442    
 443    try:
 444        header = gpkg_wkb_bytes[0:8]
 445        header_vals = struct.unpack('<ccBBi', header)
 446        flags = header_vals[-2]
 447        srid = header_vals[-1]
 448    except struct.error:
 449        header = gpkg_wkb_bytes[0:6]
 450        header_vals = struct.unpack('<ccBBh', header)
 451        flags = header_vals[-2]
 452        srid = header_vals[-1]
 453
 454    envelope_type = (flags >> 1) & 0x07
 455    envelope_sizes = {
 456        0: 0,
 457        1: 32,
 458        2: 48,
 459        3: 48,
 460        4: 64,
 461    }
 462    header_length = 8 + envelope_sizes.get(envelope_type, 0)
 463    standard_wkb_bytes = gpkg_wkb_bytes[header_length:]
 464    return standard_wkb_bytes, srid, flags
 465
 466
 467def value_is_null(value: Any) -> bool:
 468    """
 469    Determine if a value is a null-like string.
 470    """
 471    return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')
 472
 473
 474def none_if_null(value: Any) -> Any:
 475    """
 476    Return `None` if a value is a null-like string.
 477    """
 478    return (None if value_is_null(value) else value)
 479
 480
 481def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
 482    """
 483    Quantize a given `Decimal` to a known scale and precision.
 484
 485    Parameters
 486    ----------
 487    x: Decimal
 488        The `Decimal` to be quantized.
 489
 490    precision: int
 491        The total number of significant digits.
 492
 493    scale: int
 494        The number of significant digits after the decimal point.
 495
 496    Returns
 497    -------
 498    A `Decimal` quantized to the specified scale and precision.
 499    """
 500    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
 501    try:
 502        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
 503    except InvalidOperation:
 504        pass
 505
 506    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
 507
 508
 509def serialize_decimal(
 510    x: Any,
 511    quantize: bool = False,
 512    precision: Optional[int] = None,
 513    scale: Optional[int] = None,
 514) -> Any:
 515    """
 516    Return a quantized string of an input decimal.
 517
 518    Parameters
 519    ----------
 520    x: Any
 521        The potential decimal to be serialized.
 522
 523    quantize: bool, default False
 524        If `True`, quantize the incoming Decimal to the specified scale and precision
 525        before serialization.
 526
 527    precision: Optional[int], default None
 528        The precision of the decimal to be quantized.
 529
 530    scale: Optional[int], default None
 531        The scale of the decimal to be quantized.
 532
 533    Returns
 534    -------
 535    A string of the input decimal or the input if not a Decimal.
 536    """
 537    if not isinstance(x, Decimal):
 538        return x
 539
 540    if value_is_null(x):
 541        return None
 542
 543    if quantize and scale and precision:
 544        x = quantize_decimal(x, precision, scale)
 545
 546    return f"{x:f}"
 547
 548
 549def coerce_timezone(
 550    dt: Any,
 551    strip_utc: bool = False,
 552) -> Any:
 553    """
 554    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
 555    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
 556    """
 557    if dt is None:
 558        return None
 559
 560    if isinstance(dt, int):
 561        return dt
 562
 563    if isinstance(dt, str):
 564        dateutil_parser = mrsm.attempt_import('dateutil.parser')
 565        try:
 566            dt = dateutil_parser.parse(dt)
 567        except Exception:
 568            return dt
 569
 570    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
 571    if dt_is_series:
 572        pandas = mrsm.attempt_import('pandas', lazy=False)
 573
 574        if (
 575            pandas.api.types.is_datetime64_any_dtype(dt) and (
 576                (dt.dt.tz is not None and not strip_utc)
 577                or
 578                (dt.dt.tz is None and strip_utc)
 579            )
 580        ):
 581            return dt
 582
 583        dt_series = to_datetime(dt, coerce_utc=False)
 584        if dt_series.dt.tz is None:
 585            dt_series = dt_series.dt.tz_localize(timezone.utc)
 586        if strip_utc:
 587            try:
 588                if dt_series.dt.tz is not None:
 589                    dt_series = dt_series.dt.tz_localize(None)
 590            except Exception:
 591                pass
 592
 593        return dt_series
 594
 595    if dt.tzinfo is None:
 596        if strip_utc:
 597            return dt
 598        return dt.replace(tzinfo=timezone.utc)
 599
 600    utc_dt = dt.astimezone(timezone.utc)
 601    if strip_utc:
 602        return utc_dt.replace(tzinfo=None)
 603    return utc_dt
 604
 605
 606def to_datetime(
 607    dt_val: Any,
 608    as_pydatetime: bool = False,
 609    coerce_utc: bool = True,
 610    precision_unit: Optional[str] = None,
 611) -> Any:
 612    """
 613    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
 614
 615    Parameters
 616    ----------
 617    dt_val: Any
 618        The value to coerce to Pandas Timestamps.
 619
 620    as_pydatetime: bool, default False
 621        If `True`, return a Python datetime object.
 622
 623    coerce_utc: bool, default True
 624        If `True`, ensure the value has UTC tzinfo.
 625
 626    precision_unit: Optional[str], default None
 627        If provided, enforce the provided precision unit.
 628    """
 629    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
 630    is_dask = 'dask' in getattr(dt_val, '__module__', '')
 631    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
 632    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
 633    pd = pandas if dd is None else dd
 634    enforce_precision = precision_unit is not None
 635    precision_unit = precision_unit or 'microsecond'
 636    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
 637    precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None)
 638    if not precision_abbreviation:
 639        raise ValueError(f"Invalid precision '{precision_unit}'.")
 640
 641    def parse(x: Any) -> Any:
 642        try:
 643            return dateutil_parser.parse(x)
 644        except Exception:
 645            return x
 646
 647    def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool:
 648        dtype_check_against = (
 649            f"datetime64[{precision_abbreviation}, UTC]"
 650            if with_utc
 651            else f"datetime64[{precision_abbreviation}]"
 652        )
 653        return (
 654            dtype_to_check == dtype_check_against
 655            if enforce_precision
 656            else (
 657                dtype_to_check.startswith('datetime64[')
 658                and (
 659                    ('utc' in dtype_to_check.lower())
 660                    if with_utc
 661                    else ('utc' not in dtype_to_check.lower())
 662                )
 663            )
 664        )
 665
 666    if isinstance(dt_val, pd.Timestamp):
 667        dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime()
 668        return (
 669            coerce_timezone(dt_val_to_return)
 670            if coerce_utc
 671            else dt_val_to_return
 672        )
 673
 674    if dt_is_series:
 675        changed_tz = False
 676        original_tz = None
 677        dtype = str(getattr(dt_val, 'dtype', 'object'))
 678        if (
 679            are_dtypes_equal(dtype, 'datetime')
 680            and 'utc' not in dtype.lower()
 681            and hasattr(dt_val, 'dt')
 682        ):
 683            original_tz = dt_val.dt.tz
 684            dt_val = dt_val.dt.tz_localize(timezone.utc)
 685            changed_tz = True
 686            dtype = str(getattr(dt_val, 'dtype', 'object'))
 687        try:
 688            new_dt_series = (
 689                dt_val
 690                if check_dtype(dtype, with_utc=True)
 691                else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]")
 692            )
 693        except pd.errors.OutOfBoundsDatetime:
 694            try:
 695                next_precision = get_next_precision_unit(true_precision_unit)
 696                next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision]
 697                new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]")
 698            except Exception:
 699                new_dt_series = None
 700        except ValueError:
 701            new_dt_series = None
 702        except TypeError:
 703            try:
 704                new_dt_series = (
 705                    new_dt_series
 706                    if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False)
 707                    else dt_val.astype(f"datetime64[{precision_abbreviation}]")
 708                )
 709            except Exception:
 710                new_dt_series = None
 711
 712        if new_dt_series is None:
 713            new_dt_series = dt_val.apply(lambda x: parse(str(x)))
 714
 715        if coerce_utc:
 716            return coerce_timezone(new_dt_series)
 717
 718        if changed_tz:
 719            new_dt_series = new_dt_series.dt.tz_localize(original_tz)
 720        return new_dt_series
 721
 722    try:
 723        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
 724        if new_dt_val.unit != precision_abbreviation:
 725            new_dt_val = new_dt_val.as_unit(precision_abbreviation)
 726        if as_pydatetime:
 727            return new_dt_val.to_pydatetime()
 728        return new_dt_val
 729    except (pd.errors.OutOfBoundsDatetime, ValueError):
 730        pass
 731
 732    new_dt_val = parse(dt_val)
 733    if not coerce_utc:
 734        return new_dt_val
 735    return coerce_timezone(new_dt_val)
 736
 737
 738def serialize_bytes(data: bytes) -> str:
 739    """
 740    Return the given bytes as a base64-encoded string.
 741    """
 742    import base64
 743    if not isinstance(data, bytes) and value_is_null(data):
 744        return data
 745    return base64.b64encode(data).decode('utf-8')
 746
 747
 748def serialize_geometry(
 749    geom: Any,
 750    geometry_format: str = 'wkb_hex',
 751    srid: Optional[int] = None,
 752) -> Union[str, Dict[str, Any], bytes, None]:
 753    """
 754    Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 
 755
 756    Parameters
 757    ----------
 758    geom: Any
 759        The potential geometry data to be serialized.
 760
 761    geometry_format: str, default 'wkb_hex'
 762        The serialization format for geometry data.
 763        Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`.
 764
 765    srid: Optional[int], default None
 766        If provided, use this as the source CRS when serializing to GeoJSON.
 767
 768    Returns
 769    -------
 770    A string containing the geometry data, or bytes, or a dictionary, or None.
 771    """
 772    if value_is_null(geom):
 773        return None
 774
 775    shapely, shapely_ops, pyproj, np = mrsm.attempt_import(
 776        'shapely', 'shapely.ops', 'pyproj', 'numpy',
 777        lazy=False,
 778    )
 779    if geometry_format == 'geojson':
 780        if srid:
 781            transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True)
 782            geom = shapely_ops.transform(transformer.transform, geom)
 783        geojson_str = shapely.to_geojson(geom)
 784        return json.loads(geojson_str)
 785
 786    if not hasattr(geom, 'wkb_hex'):
 787        return str(geom)
 788
 789    byte_order = 1 if np.little_endian else 0
 790
 791    if geometry_format.startswith("wkb"):
 792        return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True)
 793    elif geometry_format == 'gpkg_wkb':
 794        wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order)
 795        flags = (
 796            ((byte_order & 0x01) | (0x20))
 797            if geom.is_empty
 798            else (byte_order & 0x01)
 799        )
 800        srid_val = srid or -1
 801        header = struct.pack(
 802            '<ccBBi',
 803            b'G', b'P',
 804            0,
 805            flags,
 806            srid_val
 807        )
 808        return header + wkb_data
 809
 810    return shapely.to_wkt(geom)
 811
 812
 813def deserialize_geometry(geom_wkb: Union[str, bytes]):
 814    """
 815    Deserialize a WKB string into a shapely geometry object.
 816    """
 817    shapely = mrsm.attempt_import('shapely', lazy=False)
 818    return shapely.wkb.loads(geom_wkb)
 819
 820
 821def project_geometry(geom, srid: int, to_srid: int = 4326):
 822    """
 823    Project a shapely geometry object to a new CRS (SRID).
 824    """
 825    pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False)
 826    transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True)
 827    return shapely_ops.transform(transformer.transform, geom)
 828
 829
 830def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]:
 831    """
 832    Given a serialized ASCII string of bytes data, return the original bytes.
 833    The input data may either be base64- or hex-encoded.
 834
 835    Parameters
 836    ----------
 837    data: Optional[str]
 838        The string to be deserialized into bytes.
 839        May be base64- or hex-encoded (prefixed with `'\\x'`).
 840
 841    force_hex: bool = False
 842        If `True`, treat the input string as hex-encoded.
 843        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
 844        This will still strip the leading `'\\x'` prefix if present.
 845
 846    Returns
 847    -------
 848    The original bytes used to produce the encoded string `data`.
 849    """
 850    if not isinstance(data, str) and value_is_null(data):
 851        return data
 852
 853    import binascii
 854    import base64
 855
 856    is_hex = force_hex or data.startswith('\\x')
 857
 858    if is_hex:
 859        if data.startswith('\\x'):
 860            data = data[2:]
 861        return binascii.unhexlify(data)
 862
 863    return base64.b64decode(data)
 864
 865
 866def deserialize_base64(data: str) -> bytes:
 867    """
 868    Return the original bytestring from the given base64-encoded string.
 869    """
 870    import base64
 871    return base64.b64decode(data)
 872
 873
 874def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]:
 875    """
 876    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
 877    """
 878    import binascii
 879    if not isinstance(data, bytes) and value_is_null(data):
 880        return data
 881    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
 882
 883
 884def serialize_datetime(dt: datetime) -> Union[str, None]:
 885    """
 886    Serialize a datetime object into JSON (ISO format string).
 887
 888    Examples
 889    --------
 890    >>> import json
 891    >>> from datetime import datetime
 892    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
 893    '{"a": "2022-01-01T00:00:00Z"}'
 894
 895    """
 896    if not hasattr(dt, 'isoformat'):
 897        return None
 898
 899    tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else ''
 900    return dt.isoformat() + tz_suffix
 901
 902
 903def serialize_date(d: date) -> Union[str, None]:
 904    """
 905    Serialize a date object into its ISO representation.
 906    """
 907    return d.isoformat() if hasattr(d, 'isoformat') else None
 908
 909
 910def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]:
 911    """
 912    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
 913
 914    Parameters
 915    ----------
 916    x: Any
 917        The value to serialize.
 918
 919    default_to_str: bool, default True
 920        If `True`, return a string of `x` if x is not a designated type.
 921        Otherwise return x.
 922
 923    Returns
 924    -------
 925    A serialized version of x, or x.
 926    """
 927    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
 928        return x.meta
 929
 930    if hasattr(x, 'tzinfo'):
 931        return serialize_datetime(x)
 932
 933    if hasattr(x, 'isoformat'):
 934        return serialize_date(x)
 935
 936    if isinstance(x, bytes):
 937        return serialize_bytes(x)
 938
 939    if isinstance(x, Decimal):
 940        return serialize_decimal(x)
 941
 942    if 'shapely' in str(type(x)):
 943        return serialize_geometry(x)
 944
 945    if value_is_null(x):
 946        return None
 947
 948    if isinstance(x, (dict, list, tuple)):
 949        return json.dumps(x, default=json_serialize_value, separators=(',', ':'))
 950
 951    return str(x) if default_to_str else x
 952
 953
 954def get_geometry_type_srid(
 955    dtype: str = 'geometry',
 956    default_type: str = 'geometry',
 957    default_srid: int = 4326,
 958) -> Union[Tuple[str, int], Tuple[str, None]]:
 959    """
 960    Given the specified geometry `dtype`, return a tuple in the form (type, SRID).
 961
 962    Parameters
 963    ----------
 964    dtype: Optional[str], default None
 965        Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`).
 966        You may specify a supported `shapely` geometry type and an SRID in the dtype modifier:
 967
 968        - `Point`
 969        - `LineString`
 970        - `LinearRing`
 971        - `Polygon`
 972        - `MultiPoint`
 973        - `MultiLineString`
 974        - `MultiPolygon`
 975        - `GeometryCollection`
 976
 977    Returns
 978    -------
 979    A tuple in the form (type, SRID).
 980    Defaults to `(default_type, default_srid)`.
 981
 982    Examples
 983    --------
 984    >>> from meerschaum.utils.dtypes import get_geometry_type_srid
 985    >>> get_geometry_type_srid()
 986    ('geometry', 4326)
 987    >>> get_geometry_type_srid('geometry[]')
 988    ('geometry', 4326)
 989    >>> get_geometry_type_srid('geometry[Point, 0]')
 990    ('Point', 0)
 991    >>> get_geometry_type_srid('geometry[0, Point]')
 992    ('Point', 0)
 993    >>> get_geometry_type_srid('geometry[0]')
 994    ('geometry', 0)
 995    >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
 996    ('MultiLineString', 4326)
 997    >>> get_geometry_type_srid('geography')
 998    ('geometry', 4326)
 999    >>> get_geometry_type_srid('geography[POINT]')
1000    ('Point', 4376)
1001    """
1002    from meerschaum.utils.misc import is_int
1003    ### NOTE: PostGIS syntax must also be parsed.
1004    dtype = dtype.replace('(', '[').replace(')', ']')
1005    bare_dtype = dtype.split('[', maxsplit=1)[0]
1006    modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']')
1007    if not modifier:
1008        return default_type, default_srid
1009
1010    parts = [
1011        part.split('=')[-1].strip()
1012        for part in modifier.split(',')
1013    ]
1014    parts_casted = [
1015        (
1016            int(part)
1017            if is_int(part)
1018            else part
1019        )
1020        for part in parts
1021    ]
1022
1023    srid = default_srid
1024    geometry_type = default_type
1025
1026    for part in parts_casted:
1027        if isinstance(part, int):
1028            srid = part
1029            break
1030
1031    for part in parts_casted:
1032        if isinstance(part, str):
1033            geometry_type = part
1034            break
1035
1036    return geometry_type, srid
1037
1038
1039def get_current_timestamp(
1040    precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'],
1041    precision_interval: int = 1,
1042    round_to: str = 'down',
1043    as_pandas: bool = False,
1044    as_int: bool = False,
1045    _now: Union[datetime, int, None] = None,
1046) -> 'Union[datetime, pd.Timestamp, int]':
1047    """
1048    Return the current UTC timestamp to nanosecond precision.
1049
1050    Parameters
1051    ----------
1052    precision_unit: str, default 'us'
1053        The precision of the timestamp to be returned.
1054        Valid values are the following:
1055            - `ns` / `nanosecond`
1056            - `us` / `microsecond`
1057            - `ms` / `millisecond`
1058            - `s` / `sec` / `second`
1059            - `m` / `min` / `minute`
1060            - `h` / `hr` / `hour`
1061            - `d` / `day`
1062
1063    precision_interval: int, default 1
1064        Round the timestamp to the `precision_interval` units.
1065        For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals.
1066        Note: `precision_interval` must be 1 when `precision='nanosecond'`.
1067
1068    round_to: str, default 'down'
1069        The direction to which to round the timestamp.
1070        Available options are `down`, `up`, and `closest`.
1071
1072    as_pandas: bool, default False
1073        If `True`, return a Pandas Timestamp.
1074        This is always true if `unit` is `nanosecond`.
1075
1076    as_int: bool, default False
1077        If `True`, return the timestamp to an integer.
1078        Overrides `as_pandas`.
1079
1080    Returns
1081    -------
1082    A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
1083
1084    Examples
1085    --------
1086    >>> get_current_timestamp('ns')
1087    Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
1088    >>> get_current_timestamp('ms')
1089    Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1090    """
1091    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1092    if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS:
1093        from meerschaum.utils.misc import items_str
1094        raise ValueError(
1095            f"Unknown precision unit '{precision_unit}'. "
1096            "Accepted values are "
1097            f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}."
1098        )
1099
1100    if not as_int:
1101        as_pandas = as_pandas or true_precision_unit == 'nanosecond'
1102    pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None
1103
1104    if true_precision_unit == 'nanosecond':
1105        if precision_interval != 1:
1106            warn("`precision_interval` must be 1 for nanosecond precision.")
1107        now_ts = time.time_ns() if not isinstance(_now, int) else _now
1108        if as_int:
1109            return now_ts
1110        return pd.to_datetime(now_ts, unit='ns', utc=True)
1111
1112    now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now
1113    delta = timedelta(**{true_precision_unit + 's': precision_interval})
1114    rounded_now = round_time(now, delta, to=round_to)
1115
1116    if as_int:
1117        return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit])
1118
1119    ts_val = (
1120        pd.to_datetime(rounded_now, utc=True)
1121        if as_pandas
1122        else rounded_now
1123    )
1124
1125    if not as_pandas:
1126        return ts_val
1127
1128    as_unit_precisions = ('microsecond', 'millisecond', 'second')
1129    if true_precision_unit not in as_unit_precisions:
1130        return ts_val
1131
1132    return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])
1133
1134
1135def is_dtype_special(type_: str) -> bool:
1136    """
1137    Return whether a dtype should be treated as a special Meerschaum dtype.
1138    This is not the same as a Meerschaum alias.
1139    """
1140    true_type = MRSM_ALIAS_DTYPES.get(type_, type_)
1141    if true_type in (
1142        'uuid',
1143        'json',
1144        'bytes',
1145        'numeric',
1146        'datetime',
1147        'geometry',
1148        'geography',
1149        'date',
1150        'bool',
1151    ):
1152        return True
1153
1154    if are_dtypes_equal(true_type, 'datetime'):
1155        return True
1156
1157    if are_dtypes_equal(true_type, 'date'):
1158        return True
1159
1160    if true_type.startswith('numeric'):
1161        return True
1162
1163    if true_type.startswith('bool'):
1164        return True
1165
1166    if true_type.startswith('geometry'):
1167        return True
1168
1169    if true_type.startswith('geography'):
1170        return True
1171
1172    return False
1173
1174
1175def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1176    """
1177    Get the next precision string in order of value.
1178
1179    Parameters
1180    ----------
1181    precision_unit: str
1182        The precision string (`'nanosecond'`, `'ms'`, etc.).
1183
1184    decrease: bool, defaul True
1185        If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`).
1186        If `False`, return the precision unit which is higher.
1187
1188    Returns
1189    -------
1190    A `precision` string which is lower or higher than the given precision unit.
1191
1192    Examples
1193    --------
1194    >>> get_next_precision_unit('nanosecond')
1195    'microsecond'
1196    >>> get_next_precision_unit('ms')
1197    'second'
1198    >>> get_next_precision_unit('hour', decrease=False)
1199    'minute'
1200    """
1201    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1202    precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None)
1203    if not precision_scalar:
1204        raise ValueError(f"Invalid precision unit '{precision_unit}'.")
1205
1206    precisions = sorted(
1207        list(MRSM_PRECISION_UNITS_SCALARS),
1208        key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p]
1209    )
1210
1211    precision_index = precisions.index(true_precision_unit)
1212    new_precision_index = precision_index + (-1 if decrease else 1)
1213    if new_precision_index < 0 or new_precision_index >= len(precisions):
1214        raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.")
1215
1216    return precisions[new_precision_index]
1217
1218
1219def round_time(
1220    dt: Optional[datetime] = None,
1221    date_delta: Optional[timedelta] = None,
1222    to: 'str' = 'down'
1223) -> datetime:
1224    """
1225    Round a datetime object to a multiple of a timedelta.
1226
1227    Parameters
1228    ----------
1229    dt: Optional[datetime], default None
1230        If `None`, grab the current UTC datetime.
1231
1232    date_delta: Optional[timedelta], default None
1233        If `None`, use a delta of 1 minute.
1234
1235    to: 'str', default 'down'
1236        Available options are `'up'`, `'down'`, and `'closest'`.
1237
1238    Returns
1239    -------
1240    A rounded `datetime` object.
1241
1242    Examples
1243    --------
1244    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
1245    datetime.datetime(2022, 1, 1, 12, 15)
1246    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
1247    datetime.datetime(2022, 1, 1, 12, 16)
1248    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
1249    datetime.datetime(2022, 1, 1, 12, 0)
1250    >>> round_time(
1251    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
1252    ...   timedelta(hours=1),
1253    ...   to = 'closest'
1254    ... )
1255    datetime.datetime(2022, 1, 1, 12, 0)
1256    >>> round_time(
1257    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
1258    ...   datetime.timedelta(hours=1),
1259    ...   to = 'closest'
1260    ... )
1261    datetime.datetime(2022, 1, 1, 13, 0)
1262
1263    """
1264    from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP
1265    if date_delta is None:
1266        date_delta = timedelta(minutes=1)
1267
1268    if dt is None:
1269        dt = datetime.now(timezone.utc).replace(tzinfo=None)
1270
1271    def get_total_microseconds(td: timedelta) -> int:
1272        return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds
1273
1274    round_to_microseconds = get_total_microseconds(date_delta)
1275    if round_to_microseconds == 0:
1276        return dt
1277
1278    dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min
1279    dt_total_microseconds = get_total_microseconds(dt_delta_from_min)
1280
1281    dt_dec = Decimal(dt_total_microseconds)
1282    round_to_dec = Decimal(round_to_microseconds)
1283
1284    div = dt_dec / round_to_dec
1285    if to == 'down':
1286        num_intervals = div.to_integral_value(rounding=ROUND_DOWN)
1287    elif to == 'up':
1288        num_intervals = div.to_integral_value(rounding=ROUND_UP)
1289    else:
1290        num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP)
1291
1292    rounded_dt_total_microseconds = num_intervals * round_to_dec
1293    adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds
1294
1295    return dt + timedelta(microseconds=adjustment_microseconds)
MRSM_ALIAS_DTYPES: Dict[str, str] = {'decimal': 'numeric', 'Decimal': 'numeric', 'number': 'numeric', 'jsonl': 'json', 'JSON': 'json', 'binary': 'bytes', 'blob': 'bytes', 'varbinary': 'bytes', 'bytea': 'bytes', 'guid': 'uuid', 'UUID': 'uuid', 'geom': 'geometry', 'geog': 'geography', 'boolean': 'bool', 'day': 'date'}
MRSM_PD_DTYPES: Dict[Optional[str], str] = {'json': 'object', 'numeric': 'object', 'geometry': 'object', 'geography': 'object', 'uuid': 'object', 'date': 'date32[day][pyarrow]', 'datetime': 'datetime64[us, UTC]', 'bool': 'bool[pyarrow]', 'int': 'int64[pyarrow]', 'int8': 'int8[pyarrow]', 'int16': 'int16[pyarrow]', 'int32': 'int32[pyarrow]', 'int64': 'int64[pyarrow]', 'str': 'string', 'bytes': 'binary[pyarrow]', None: 'object'}
MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = {'nanosecond': 1000000000, 'microsecond': 1000000, 'millisecond': 1000, 'second': 1, 'minute': 0.016666666666666666, 'hour': 0.0002777777777777778, 'day': 1.1574074074074073e-05}
MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = {'ns': 'nanosecond', 'us': 'microsecond', 'ms': 'millisecond', 's': 'second', 'sec': 'second', 'm': 'minute', 'min': 'minute', 'h': 'hour', 'hr': 'hour', 'd': 'day', 'D': 'day'}
MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = {'nanosecond': 'ns', 'microsecond': 'us', 'millisecond': 'ms', 'second': 's', 'minute': 'min', 'hour': 'hr', 'day': 'D'}
def to_pandas_dtype(dtype: str) -> str:
 93def to_pandas_dtype(dtype: str) -> str:
 94    """
 95    Cast a supported Meerschaum dtype to a Pandas dtype.
 96    """
 97    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
 98    if known_dtype is not None:
 99        return known_dtype
100
101    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
102    if alias_dtype is not None:
103        return MRSM_PD_DTYPES[alias_dtype]
104
105    if dtype.startswith('numeric'):
106        return MRSM_PD_DTYPES['numeric']
107
108    if dtype.startswith('geometry'):
109        return MRSM_PD_DTYPES['geometry']
110
111    if dtype.startswith('geography'):
112        return MRSM_PD_DTYPES['geography']
113
114    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
115    ### treat it as a SQL db type.
116    if dtype.split(' ')[0].isupper():
117        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
118        return get_pd_type_from_db_type(dtype)
119
120    from meerschaum.utils.packages import attempt_import
121    _ = attempt_import('pyarrow', lazy=False)
122    pandas = attempt_import('pandas', lazy=False)
123
124    try:
125        return str(pandas.api.types.pandas_dtype(dtype))
126    except Exception:
127        warn(
128            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
129            + f"{traceback.format_exc()}",
130            stack=False,
131        )
132    return 'object'

Cast a supported Meerschaum dtype to a Pandas dtype.

def are_dtypes_equal( ldtype: Union[str, Dict[str, str]], rdtype: Union[str, Dict[str, str]]) -> bool:
135def are_dtypes_equal(
136    ldtype: Union[str, Dict[str, str]],
137    rdtype: Union[str, Dict[str, str]],
138) -> bool:
139    """
140    Determine whether two dtype strings may be considered
141    equivalent to avoid unnecessary conversions.
142
143    Parameters
144    ----------
145    ldtype: Union[str, Dict[str, str]]
146        The left dtype to compare.
147        May also provide a dtypes dictionary.
148
149    rdtype: Union[str, Dict[str, str]]
150        The right dtype to compare.
151        May also provide a dtypes dictionary.
152
153    Returns
154    -------
155    A `bool` indicating whether the two dtypes are to be considered equivalent.
156    """
157    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
158        lkeys = sorted([str(k) for k in ldtype.keys()])
159        rkeys = sorted([str(k) for k in rdtype.keys()])
160        for lkey, rkey in zip(lkeys, rkeys):
161            if lkey != rkey:
162                return False
163            ltype = ldtype[lkey]
164            rtype = rdtype[rkey]
165            if not are_dtypes_equal(ltype, rtype):
166                return False
167        return True
168
169    try:
170        if ldtype == rdtype:
171            return True
172    except Exception:
173        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
174        return False
175
176    ### Sometimes pandas dtype objects are passed.
177    ldtype = str(ldtype).split('[', maxsplit=1)[0]
178    rdtype = str(rdtype).split('[', maxsplit=1)[0]
179
180    if ldtype in MRSM_ALIAS_DTYPES:
181        ldtype = MRSM_ALIAS_DTYPES[ldtype]
182
183    if rdtype in MRSM_ALIAS_DTYPES:
184        rdtype = MRSM_ALIAS_DTYPES[rdtype]
185
186    json_dtypes = ('json', 'object')
187    if ldtype in json_dtypes and rdtype in json_dtypes:
188        return True
189
190    numeric_dtypes = ('numeric', 'decimal', 'object')
191    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
192        return True
193
194    uuid_dtypes = ('uuid', 'object')
195    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
196        return True
197
198    bytes_dtypes = ('bytes', 'object', 'binary')
199    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
200        return True
201
202    geometry_dtypes = ('geometry', 'object', 'geography')
203    if ldtype in geometry_dtypes and rdtype in geometry_dtypes:
204        return True
205
206    if ldtype.lower() == rdtype.lower():
207        return True
208
209    datetime_dtypes = ('datetime', 'timestamp')
210    ldtype_found_dt_prefix = False
211    rdtype_found_dt_prefix = False
212    for dt_prefix in datetime_dtypes:
213        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
214        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
215    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
216        return True
217
218    string_dtypes = ('str', 'string', 'object')
219    if ldtype in string_dtypes and rdtype in string_dtypes:
220        return True
221
222    int_dtypes = (
223        'int', 'int64', 'int32', 'int16', 'int8',
224        'uint', 'uint64', 'uint32', 'uint16', 'uint8',
225    )
226    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
227        return True
228
229    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
230    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
231        return True
232
233    bool_dtypes = ('bool', 'boolean')
234    if ldtype in bool_dtypes and rdtype in bool_dtypes:
235        return True
236
237    date_dtypes = (
238        'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]',
239        'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]',
240    )
241    if ldtype in date_dtypes and rdtype in date_dtypes:
242        return True
243
244    return False

Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.

Parameters
  • ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
  • rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
  • A bool indicating whether the two dtypes are to be considered equivalent.
def is_dtype_numeric(dtype: str) -> bool:
247def is_dtype_numeric(dtype: str) -> bool:
248    """
249    Determine whether a given `dtype` string
250    should be considered compatible with the Meerschaum dtype `numeric`.
251
252    Parameters
253    ----------
254    dtype: str
255        The pandas-like dtype string.
256
257    Returns
258    -------
259    A bool indicating the dtype is compatible with `numeric`.
260    """
261    dtype_lower = dtype.lower()
262
263    acceptable_substrings = ('numeric', 'float', 'double', 'int')
264    for substring in acceptable_substrings:
265        if substring in dtype_lower:
266            return True
267
268    return False

Determine whether a given dtype string should be considered compatible with the Meerschaum dtype numeric.

Parameters
  • dtype (str): The pandas-like dtype string.
Returns
  • A bool indicating the dtype is compatible with numeric.
def attempt_cast_to_numeric( value: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
271def attempt_cast_to_numeric(
272    value: Any,
273    quantize: bool = False,
274    precision: Optional[int] = None,
275    scale: Optional[int] = None,
276)-> Any:
277    """
278    Given a value, attempt to coerce it into a numeric (Decimal).
279
280    Parameters
281    ----------
282    value: Any
283        The value to be cast to a Decimal.
284
285    quantize: bool, default False
286        If `True`, quantize the decimal to the specified precision and scale.
287
288    precision: Optional[int], default None
289        If `quantize` is `True`, use this precision.
290
291    scale: Optional[int], default None
292        If `quantize` is `True`, use this scale.
293
294    Returns
295    -------
296    A `Decimal` if possible, or `value`.
297    """
298    if isinstance(value, Decimal):
299        if quantize and precision and scale:
300            return quantize_decimal(value, precision, scale)
301        return value
302    try:
303        if value_is_null(value):
304            return Decimal('NaN')
305
306        dec = Decimal(str(value))
307        if not quantize or not precision or not scale:
308            return dec
309        return quantize_decimal(dec, precision, scale)
310    except Exception:
311        return value

Given a value, attempt to coerce it into a numeric (Decimal).

Parameters
  • value (Any): The value to be cast to a Decimal.
  • quantize (bool, default False): If True, quantize the decimal to the specified precision and scale.
  • precision (Optional[int], default None): If quantize is True, use this precision.
  • scale (Optional[int], default None): If quantize is True, use this scale.
Returns
  • A Decimal if possible, or value.
def attempt_cast_to_uuid(value: Any) -> Any:
314def attempt_cast_to_uuid(value: Any) -> Any:
315    """
316    Given a value, attempt to coerce it into a UUID (`uuid4`).
317    """
318    if isinstance(value, uuid.UUID):
319        return value
320    try:
321        return (
322            uuid.UUID(str(value))
323            if not value_is_null(value)
324            else None
325        )
326    except Exception:
327        return value

Given a value, attempt to coerce it into a UUID (uuid4).

def attempt_cast_to_bytes(value: Any) -> Any:
330def attempt_cast_to_bytes(value: Any) -> Any:
331    """
332    Given a value, attempt to coerce it into a bytestring.
333    """
334    if isinstance(value, bytes):
335        return value
336    try:
337        return (
338            deserialize_bytes_string(str(value))
339            if not value_is_null(value)
340            else None
341        )
342    except Exception:
343        return value

Given a value, attempt to coerce it into a bytestring.

def attempt_cast_to_geometry(value: Any) -> Any:
346def attempt_cast_to_geometry(value: Any) -> Any:
347    """
348    Given a value, attempt to coerce it into a `shapely` (`geometry`) object.
349    """
350    shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import(
351        'shapely',
352        'shapely.wkt',
353        'shapely.wkb',
354        lazy=False,
355    )
356    if 'shapely' in str(type(value)):
357        return value
358
359    if isinstance(value, (dict, list)):
360        try:
361            return shapely.from_geojson(json.dumps(value))
362        except Exception:
363            return value
364
365    value_is_gpkg = geometry_is_gpkg(value)
366    if value_is_gpkg:
367        try:
368            wkb_data, _, _ = gpkg_wkb_to_wkb(value)
369            return shapely_wkb.loads(wkb_data)
370        except Exception:
371            return value
372
373    value_is_wkt = geometry_is_wkt(value)
374    if value_is_wkt is None:
375        return value
376
377    try:
378        return (
379            shapely_wkt.loads(value)
380            if value_is_wkt
381            else shapely_wkb.loads(value)
382        )
383    except Exception:
384        return value

Given a value, attempt to coerce it into a shapely (geometry) object.

def geometry_is_wkt(value: Union[str, bytes]) -> Optional[bool]:
387def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]:
388    """
389    Determine whether an input value should be treated as WKT or WKB geometry data.
390
391    Parameters
392    ----------
393    value: Union[str, bytes]
394        The input data to be parsed into geometry data.
395
396    Returns
397    -------
398    A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB).
399    Return `None` if `value` should be parsed as neither.
400    """
401    import re
402    if not isinstance(value, (str, bytes)):
403        return None
404
405    if isinstance(value, bytes):
406        return False
407    
408    wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$'
409    if re.match(wkt_pattern, value, re.IGNORECASE):
410        return True
411    
412    if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0:
413        return False
414    
415    return None

Determine whether an input value should be treated as WKT or WKB geometry data.

Parameters
  • value (Union[str, bytes]): The input data to be parsed into geometry data.
Returns
  • A bool (True if value is WKT and False if it should be treated as WKB).
  • Return None if value should be parsed as neither.
def geometry_is_gpkg(value: bytes) -> bool:
418def geometry_is_gpkg(value: bytes) -> bool:
419    """
420    Return whether the input `value` is formatted as GeoPackage WKB.
421    """
422    if not isinstance(value, bytes) or len(value) < 2:
423        return False
424
425    return value[0:2] == b'GP'

Return whether the input value is formatted as GeoPackage WKB.

def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]:
427def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]:
428    """
429    Converts GeoPackage WKB to standard WKB by removing the header.
430    
431    Parameters
432    ----------
433    gpkg_wkb_bytes: bytes
434        The GeoPackage WKB byte string.
435        
436    Returns
437    -------
438    A tuple containing the standard WKB bytes, SRID, and flags.
439    """
440    magic_number = gpkg_wkb_bytes[0:2]
441    if magic_number != b'GP':
442        raise ValueError("Invalid GeoPackage WKB header: missing magic number.")
443    
444    try:
445        header = gpkg_wkb_bytes[0:8]
446        header_vals = struct.unpack('<ccBBi', header)
447        flags = header_vals[-2]
448        srid = header_vals[-1]
449    except struct.error:
450        header = gpkg_wkb_bytes[0:6]
451        header_vals = struct.unpack('<ccBBh', header)
452        flags = header_vals[-2]
453        srid = header_vals[-1]
454
455    envelope_type = (flags >> 1) & 0x07
456    envelope_sizes = {
457        0: 0,
458        1: 32,
459        2: 48,
460        3: 48,
461        4: 64,
462    }
463    header_length = 8 + envelope_sizes.get(envelope_type, 0)
464    standard_wkb_bytes = gpkg_wkb_bytes[header_length:]
465    return standard_wkb_bytes, srid, flags

Converts GeoPackage WKB to standard WKB by removing the header.

Parameters
  • gpkg_wkb_bytes (bytes): The GeoPackage WKB byte string.
Returns
  • A tuple containing the standard WKB bytes, SRID, and flags.
def value_is_null(value: Any) -> bool:
468def value_is_null(value: Any) -> bool:
469    """
470    Determine if a value is a null-like string.
471    """
472    return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')

Determine if a value is a null-like string.

def none_if_null(value: Any) -> Any:
475def none_if_null(value: Any) -> Any:
476    """
477    Return `None` if a value is a null-like string.
478    """
479    return (None if value_is_null(value) else value)

Return None if a value is a null-like string.

def quantize_decimal(x: decimal.Decimal, precision: int, scale: int) -> decimal.Decimal:
482def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
483    """
484    Quantize a given `Decimal` to a known scale and precision.
485
486    Parameters
487    ----------
488    x: Decimal
489        The `Decimal` to be quantized.
490
491    precision: int
492        The total number of significant digits.
493
494    scale: int
495        The number of significant digits after the decimal point.
496
497    Returns
498    -------
499    A `Decimal` quantized to the specified scale and precision.
500    """
501    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
502    try:
503        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
504    except InvalidOperation:
505        pass
506
507    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")

Quantize a given Decimal to a known scale and precision.

Parameters
  • x (Decimal): The Decimal to be quantized.
  • precision (int): The total number of significant digits.
  • scale (int): The number of significant digits after the decimal point.
Returns
  • A Decimal quantized to the specified scale and precision.
def serialize_decimal( x: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
510def serialize_decimal(
511    x: Any,
512    quantize: bool = False,
513    precision: Optional[int] = None,
514    scale: Optional[int] = None,
515) -> Any:
516    """
517    Return a quantized string of an input decimal.
518
519    Parameters
520    ----------
521    x: Any
522        The potential decimal to be serialized.
523
524    quantize: bool, default False
525        If `True`, quantize the incoming Decimal to the specified scale and precision
526        before serialization.
527
528    precision: Optional[int], default None
529        The precision of the decimal to be quantized.
530
531    scale: Optional[int], default None
532        The scale of the decimal to be quantized.
533
534    Returns
535    -------
536    A string of the input decimal or the input if not a Decimal.
537    """
538    if not isinstance(x, Decimal):
539        return x
540
541    if value_is_null(x):
542        return None
543
544    if quantize and scale and precision:
545        x = quantize_decimal(x, precision, scale)
546
547    return f"{x:f}"

Return a quantized string of an input decimal.

Parameters
  • x (Any): The potential decimal to be serialized.
  • quantize (bool, default False): If True, quantize the incoming Decimal to the specified scale and precision before serialization.
  • precision (Optional[int], default None): The precision of the decimal to be quantized.
  • scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
  • A string of the input decimal or the input if not a Decimal.
def coerce_timezone(dt: Any, strip_utc: bool = False) -> Any:
550def coerce_timezone(
551    dt: Any,
552    strip_utc: bool = False,
553) -> Any:
554    """
555    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
556    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
557    """
558    if dt is None:
559        return None
560
561    if isinstance(dt, int):
562        return dt
563
564    if isinstance(dt, str):
565        dateutil_parser = mrsm.attempt_import('dateutil.parser')
566        try:
567            dt = dateutil_parser.parse(dt)
568        except Exception:
569            return dt
570
571    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
572    if dt_is_series:
573        pandas = mrsm.attempt_import('pandas', lazy=False)
574
575        if (
576            pandas.api.types.is_datetime64_any_dtype(dt) and (
577                (dt.dt.tz is not None and not strip_utc)
578                or
579                (dt.dt.tz is None and strip_utc)
580            )
581        ):
582            return dt
583
584        dt_series = to_datetime(dt, coerce_utc=False)
585        if dt_series.dt.tz is None:
586            dt_series = dt_series.dt.tz_localize(timezone.utc)
587        if strip_utc:
588            try:
589                if dt_series.dt.tz is not None:
590                    dt_series = dt_series.dt.tz_localize(None)
591            except Exception:
592                pass
593
594        return dt_series
595
596    if dt.tzinfo is None:
597        if strip_utc:
598            return dt
599        return dt.replace(tzinfo=timezone.utc)
600
601    utc_dt = dt.astimezone(timezone.utc)
602    if strip_utc:
603        return utc_dt.replace(tzinfo=None)
604    return utc_dt

Given a datetime, pandas Timestamp or Series of Timestamp, return a UTC timestamp (strip timezone if strip_utc is True.

def to_datetime( dt_val: Any, as_pydatetime: bool = False, coerce_utc: bool = True, precision_unit: Optional[str] = None) -> Any:
607def to_datetime(
608    dt_val: Any,
609    as_pydatetime: bool = False,
610    coerce_utc: bool = True,
611    precision_unit: Optional[str] = None,
612) -> Any:
613    """
614    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
615
616    Parameters
617    ----------
618    dt_val: Any
619        The value to coerce to Pandas Timestamps.
620
621    as_pydatetime: bool, default False
622        If `True`, return a Python datetime object.
623
624    coerce_utc: bool, default True
625        If `True`, ensure the value has UTC tzinfo.
626
627    precision_unit: Optional[str], default None
628        If provided, enforce the provided precision unit.
629    """
630    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
631    is_dask = 'dask' in getattr(dt_val, '__module__', '')
632    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
633    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
634    pd = pandas if dd is None else dd
635    enforce_precision = precision_unit is not None
636    precision_unit = precision_unit or 'microsecond'
637    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
638    precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None)
639    if not precision_abbreviation:
640        raise ValueError(f"Invalid precision '{precision_unit}'.")
641
642    def parse(x: Any) -> Any:
643        try:
644            return dateutil_parser.parse(x)
645        except Exception:
646            return x
647
648    def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool:
649        dtype_check_against = (
650            f"datetime64[{precision_abbreviation}, UTC]"
651            if with_utc
652            else f"datetime64[{precision_abbreviation}]"
653        )
654        return (
655            dtype_to_check == dtype_check_against
656            if enforce_precision
657            else (
658                dtype_to_check.startswith('datetime64[')
659                and (
660                    ('utc' in dtype_to_check.lower())
661                    if with_utc
662                    else ('utc' not in dtype_to_check.lower())
663                )
664            )
665        )
666
667    if isinstance(dt_val, pd.Timestamp):
668        dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime()
669        return (
670            coerce_timezone(dt_val_to_return)
671            if coerce_utc
672            else dt_val_to_return
673        )
674
675    if dt_is_series:
676        changed_tz = False
677        original_tz = None
678        dtype = str(getattr(dt_val, 'dtype', 'object'))
679        if (
680            are_dtypes_equal(dtype, 'datetime')
681            and 'utc' not in dtype.lower()
682            and hasattr(dt_val, 'dt')
683        ):
684            original_tz = dt_val.dt.tz
685            dt_val = dt_val.dt.tz_localize(timezone.utc)
686            changed_tz = True
687            dtype = str(getattr(dt_val, 'dtype', 'object'))
688        try:
689            new_dt_series = (
690                dt_val
691                if check_dtype(dtype, with_utc=True)
692                else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]")
693            )
694        except pd.errors.OutOfBoundsDatetime:
695            try:
696                next_precision = get_next_precision_unit(true_precision_unit)
697                next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision]
698                new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]")
699            except Exception:
700                new_dt_series = None
701        except ValueError:
702            new_dt_series = None
703        except TypeError:
704            try:
705                new_dt_series = (
706                    new_dt_series
707                    if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False)
708                    else dt_val.astype(f"datetime64[{precision_abbreviation}]")
709                )
710            except Exception:
711                new_dt_series = None
712
713        if new_dt_series is None:
714            new_dt_series = dt_val.apply(lambda x: parse(str(x)))
715
716        if coerce_utc:
717            return coerce_timezone(new_dt_series)
718
719        if changed_tz:
720            new_dt_series = new_dt_series.dt.tz_localize(original_tz)
721        return new_dt_series
722
723    try:
724        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
725        if new_dt_val.unit != precision_abbreviation:
726            new_dt_val = new_dt_val.as_unit(precision_abbreviation)
727        if as_pydatetime:
728            return new_dt_val.to_pydatetime()
729        return new_dt_val
730    except (pd.errors.OutOfBoundsDatetime, ValueError):
731        pass
732
733    new_dt_val = parse(dt_val)
734    if not coerce_utc:
735        return new_dt_val
736    return coerce_timezone(new_dt_val)

Wrap pd.to_datetime() and add support for out-of-bounds values.

Parameters
  • dt_val (Any): The value to coerce to Pandas Timestamps.
  • as_pydatetime (bool, default False): If True, return a Python datetime object.
  • coerce_utc (bool, default True): If True, ensure the value has UTC tzinfo.
  • precision_unit (Optional[str], default None): If provided, enforce the provided precision unit.
def serialize_bytes(data: bytes) -> str:
739def serialize_bytes(data: bytes) -> str:
740    """
741    Return the given bytes as a base64-encoded string.
742    """
743    import base64
744    if not isinstance(data, bytes) and value_is_null(data):
745        return data
746    return base64.b64encode(data).decode('utf-8')

Return the given bytes as a base64-encoded string.

def serialize_geometry( geom: Any, geometry_format: str = 'wkb_hex', srid: Optional[int] = None) -> Union[str, Dict[str, Any], bytes, NoneType]:
749def serialize_geometry(
750    geom: Any,
751    geometry_format: str = 'wkb_hex',
752    srid: Optional[int] = None,
753) -> Union[str, Dict[str, Any], bytes, None]:
754    """
755    Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 
756
757    Parameters
758    ----------
759    geom: Any
760        The potential geometry data to be serialized.
761
762    geometry_format: str, default 'wkb_hex'
763        The serialization format for geometry data.
764        Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`.
765
766    srid: Optional[int], default None
767        If provided, use this as the source CRS when serializing to GeoJSON.
768
769    Returns
770    -------
771    A string containing the geometry data, or bytes, or a dictionary, or None.
772    """
773    if value_is_null(geom):
774        return None
775
776    shapely, shapely_ops, pyproj, np = mrsm.attempt_import(
777        'shapely', 'shapely.ops', 'pyproj', 'numpy',
778        lazy=False,
779    )
780    if geometry_format == 'geojson':
781        if srid:
782            transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True)
783            geom = shapely_ops.transform(transformer.transform, geom)
784        geojson_str = shapely.to_geojson(geom)
785        return json.loads(geojson_str)
786
787    if not hasattr(geom, 'wkb_hex'):
788        return str(geom)
789
790    byte_order = 1 if np.little_endian else 0
791
792    if geometry_format.startswith("wkb"):
793        return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True)
794    elif geometry_format == 'gpkg_wkb':
795        wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order)
796        flags = (
797            ((byte_order & 0x01) | (0x20))
798            if geom.is_empty
799            else (byte_order & 0x01)
800        )
801        srid_val = srid or -1
802        header = struct.pack(
803            '<ccBBi',
804            b'G', b'P',
805            0,
806            flags,
807            srid_val
808        )
809        return header + wkb_data
810
811    return shapely.to_wkt(geom)

Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON.

Parameters
  • geom (Any): The potential geometry data to be serialized.
  • geometry_format (str, default 'wkb_hex'): The serialization format for geometry data. Accepted formats are wkb, wkb_hex, wkt, geojson, and gpkg_wkb.
  • srid (Optional[int], default None): If provided, use this as the source CRS when serializing to GeoJSON.
Returns
  • A string containing the geometry data, or bytes, or a dictionary, or None.
def deserialize_geometry(geom_wkb: Union[str, bytes]):
814def deserialize_geometry(geom_wkb: Union[str, bytes]):
815    """
816    Deserialize a WKB string into a shapely geometry object.
817    """
818    shapely = mrsm.attempt_import('shapely', lazy=False)
819    return shapely.wkb.loads(geom_wkb)

Deserialize a WKB string into a shapely geometry object.

def project_geometry(geom, srid: int, to_srid: int = 4326):
822def project_geometry(geom, srid: int, to_srid: int = 4326):
823    """
824    Project a shapely geometry object to a new CRS (SRID).
825    """
826    pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False)
827    transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True)
828    return shapely_ops.transform(transformer.transform, geom)

Project a shapely geometry object to a new CRS (SRID).

def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Optional[bytes]:
831def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]:
832    """
833    Given a serialized ASCII string of bytes data, return the original bytes.
834    The input data may either be base64- or hex-encoded.
835
836    Parameters
837    ----------
838    data: Optional[str]
839        The string to be deserialized into bytes.
840        May be base64- or hex-encoded (prefixed with `'\\x'`).
841
842    force_hex: bool = False
843        If `True`, treat the input string as hex-encoded.
844        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
845        This will still strip the leading `'\\x'` prefix if present.
846
847    Returns
848    -------
849    The original bytes used to produce the encoded string `data`.
850    """
851    if not isinstance(data, str) and value_is_null(data):
852        return data
853
854    import binascii
855    import base64
856
857    is_hex = force_hex or data.startswith('\\x')
858
859    if is_hex:
860        if data.startswith('\\x'):
861            data = data[2:]
862        return binascii.unhexlify(data)
863
864    return base64.b64decode(data)

Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.

Parameters
  • data (Optional[str]): The string to be deserialized into bytes. May be base64- or hex-encoded (prefixed with '\x').
  • force_hex (bool = False): If True, treat the input string as hex-encoded. If data does not begin with the prefix '\x', set force_hex to True. This will still strip the leading '\x' prefix if present.
Returns
  • The original bytes used to produce the encoded string data.
def deserialize_base64(data: str) -> bytes:
867def deserialize_base64(data: str) -> bytes:
868    """
869    Return the original bytestring from the given base64-encoded string.
870    """
871    import base64
872    return base64.b64decode(data)

Return the original bytestring from the given base64-encoded string.

def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Optional[str]:
875def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]:
876    """
877    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
878    """
879    import binascii
880    if not isinstance(data, bytes) and value_is_null(data):
881        return data
882    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')

Return the given bytes as a hex string for PostgreSQL's BYTEA type.

def serialize_datetime(dt: datetime.datetime) -> Optional[str]:
885def serialize_datetime(dt: datetime) -> Union[str, None]:
886    """
887    Serialize a datetime object into JSON (ISO format string).
888
889    Examples
890    --------
891    >>> import json
892    >>> from datetime import datetime
893    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
894    '{"a": "2022-01-01T00:00:00Z"}'
895
896    """
897    if not hasattr(dt, 'isoformat'):
898        return None
899
900    tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else ''
901    return dt.isoformat() + tz_suffix

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
def serialize_date(d: datetime.date) -> Optional[str]:
904def serialize_date(d: date) -> Union[str, None]:
905    """
906    Serialize a date object into its ISO representation.
907    """
908    return d.isoformat() if hasattr(d, 'isoformat') else None

Serialize a date object into its ISO representation.

def json_serialize_value(x: Any, default_to_str: bool = True) -> Optional[str]:
911def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]:
912    """
913    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
914
915    Parameters
916    ----------
917    x: Any
918        The value to serialize.
919
920    default_to_str: bool, default True
921        If `True`, return a string of `x` if x is not a designated type.
922        Otherwise return x.
923
924    Returns
925    -------
926    A serialized version of x, or x.
927    """
928    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
929        return x.meta
930
931    if hasattr(x, 'tzinfo'):
932        return serialize_datetime(x)
933
934    if hasattr(x, 'isoformat'):
935        return serialize_date(x)
936
937    if isinstance(x, bytes):
938        return serialize_bytes(x)
939
940    if isinstance(x, Decimal):
941        return serialize_decimal(x)
942
943    if 'shapely' in str(type(x)):
944        return serialize_geometry(x)
945
946    if value_is_null(x):
947        return None
948
949    if isinstance(x, (dict, list, tuple)):
950        return json.dumps(x, default=json_serialize_value, separators=(',', ':'))
951
952    return str(x) if default_to_str else x

Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.

Parameters
  • x (Any): The value to serialize.
  • default_to_str (bool, default True): If True, return a string of x if x is not a designated type. Otherwise return x.
Returns
  • A serialized version of x, or x.
def get_geometry_type_srid( dtype: str = 'geometry', default_type: str = 'geometry', default_srid: int = 4326) -> Union[Tuple[str, int], Tuple[str, NoneType]]:
 955def get_geometry_type_srid(
 956    dtype: str = 'geometry',
 957    default_type: str = 'geometry',
 958    default_srid: int = 4326,
 959) -> Union[Tuple[str, int], Tuple[str, None]]:
 960    """
 961    Given the specified geometry `dtype`, return a tuple in the form (type, SRID).
 962
 963    Parameters
 964    ----------
 965    dtype: Optional[str], default None
 966        Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`).
 967        You may specify a supported `shapely` geometry type and an SRID in the dtype modifier:
 968
 969        - `Point`
 970        - `LineString`
 971        - `LinearRing`
 972        - `Polygon`
 973        - `MultiPoint`
 974        - `MultiLineString`
 975        - `MultiPolygon`
 976        - `GeometryCollection`
 977
 978    Returns
 979    -------
 980    A tuple in the form (type, SRID).
 981    Defaults to `(default_type, default_srid)`.
 982
 983    Examples
 984    --------
 985    >>> from meerschaum.utils.dtypes import get_geometry_type_srid
 986    >>> get_geometry_type_srid()
 987    ('geometry', 4326)
 988    >>> get_geometry_type_srid('geometry[]')
 989    ('geometry', 4326)
 990    >>> get_geometry_type_srid('geometry[Point, 0]')
 991    ('Point', 0)
 992    >>> get_geometry_type_srid('geometry[0, Point]')
 993    ('Point', 0)
 994    >>> get_geometry_type_srid('geometry[0]')
 995    ('geometry', 0)
 996    >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
 997    ('MultiLineString', 4326)
 998    >>> get_geometry_type_srid('geography')
 999    ('geometry', 4326)
1000    >>> get_geometry_type_srid('geography[POINT]')
1001    ('Point', 4376)
1002    """
1003    from meerschaum.utils.misc import is_int
1004    ### NOTE: PostGIS syntax must also be parsed.
1005    dtype = dtype.replace('(', '[').replace(')', ']')
1006    bare_dtype = dtype.split('[', maxsplit=1)[0]
1007    modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']')
1008    if not modifier:
1009        return default_type, default_srid
1010
1011    parts = [
1012        part.split('=')[-1].strip()
1013        for part in modifier.split(',')
1014    ]
1015    parts_casted = [
1016        (
1017            int(part)
1018            if is_int(part)
1019            else part
1020        )
1021        for part in parts
1022    ]
1023
1024    srid = default_srid
1025    geometry_type = default_type
1026
1027    for part in parts_casted:
1028        if isinstance(part, int):
1029            srid = part
1030            break
1031
1032    for part in parts_casted:
1033        if isinstance(part, str):
1034            geometry_type = part
1035            break
1036
1037    return geometry_type, srid

Given the specified geometry dtype, return a tuple in the form (type, SRID).

Parameters
  • dtype (Optional[str], default None): Optionally provide a specific geometry syntax (e.g. geometry[MultiLineString, 4326]). You may specify a supported shapely geometry type and an SRID in the dtype modifier:

    • Point
    • LineString
    • LinearRing
    • Polygon
    • MultiPoint
    • MultiLineString
    • MultiPolygon
    • GeometryCollection
Returns
  • A tuple in the form (type, SRID).
  • Defaults to (default_type, default_srid).
Examples
>>> from meerschaum.utils.dtypes import get_geometry_type_srid
>>> get_geometry_type_srid()
('geometry', 4326)
>>> get_geometry_type_srid('geometry[]')
('geometry', 4326)
>>> get_geometry_type_srid('geometry[Point, 0]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0, Point]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0]')
('geometry', 0)
>>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
('MultiLineString', 4326)
>>> get_geometry_type_srid('geography')
('geometry', 4326)
>>> get_geometry_type_srid('geography[POINT]')
('Point', 4376)
def get_current_timestamp( precision_unit: str = 'microsecond', precision_interval: int = 1, round_to: str = 'down', as_pandas: bool = False, as_int: bool = False, _now: Union[datetime.datetime, int, NoneType] = None) -> 'Union[datetime, pd.Timestamp, int]':
1040def get_current_timestamp(
1041    precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'],
1042    precision_interval: int = 1,
1043    round_to: str = 'down',
1044    as_pandas: bool = False,
1045    as_int: bool = False,
1046    _now: Union[datetime, int, None] = None,
1047) -> 'Union[datetime, pd.Timestamp, int]':
1048    """
1049    Return the current UTC timestamp to nanosecond precision.
1050
1051    Parameters
1052    ----------
1053    precision_unit: str, default 'us'
1054        The precision of the timestamp to be returned.
1055        Valid values are the following:
1056            - `ns` / `nanosecond`
1057            - `us` / `microsecond`
1058            - `ms` / `millisecond`
1059            - `s` / `sec` / `second`
1060            - `m` / `min` / `minute`
1061            - `h` / `hr` / `hour`
1062            - `d` / `day`
1063
1064    precision_interval: int, default 1
1065        Round the timestamp to the `precision_interval` units.
1066        For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals.
1067        Note: `precision_interval` must be 1 when `precision='nanosecond'`.
1068
1069    round_to: str, default 'down'
1070        The direction to which to round the timestamp.
1071        Available options are `down`, `up`, and `closest`.
1072
1073    as_pandas: bool, default False
1074        If `True`, return a Pandas Timestamp.
1075        This is always true if `unit` is `nanosecond`.
1076
1077    as_int: bool, default False
1078        If `True`, return the timestamp to an integer.
1079        Overrides `as_pandas`.
1080
1081    Returns
1082    -------
1083    A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
1084
1085    Examples
1086    --------
1087    >>> get_current_timestamp('ns')
1088    Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
1089    >>> get_current_timestamp('ms')
1090    Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1091    """
1092    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1093    if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS:
1094        from meerschaum.utils.misc import items_str
1095        raise ValueError(
1096            f"Unknown precision unit '{precision_unit}'. "
1097            "Accepted values are "
1098            f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}."
1099        )
1100
1101    if not as_int:
1102        as_pandas = as_pandas or true_precision_unit == 'nanosecond'
1103    pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None
1104
1105    if true_precision_unit == 'nanosecond':
1106        if precision_interval != 1:
1107            warn("`precision_interval` must be 1 for nanosecond precision.")
1108        now_ts = time.time_ns() if not isinstance(_now, int) else _now
1109        if as_int:
1110            return now_ts
1111        return pd.to_datetime(now_ts, unit='ns', utc=True)
1112
1113    now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now
1114    delta = timedelta(**{true_precision_unit + 's': precision_interval})
1115    rounded_now = round_time(now, delta, to=round_to)
1116
1117    if as_int:
1118        return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit])
1119
1120    ts_val = (
1121        pd.to_datetime(rounded_now, utc=True)
1122        if as_pandas
1123        else rounded_now
1124    )
1125
1126    if not as_pandas:
1127        return ts_val
1128
1129    as_unit_precisions = ('microsecond', 'millisecond', 'second')
1130    if true_precision_unit not in as_unit_precisions:
1131        return ts_val
1132
1133    return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])

Return the current UTC timestamp to nanosecond precision.

Parameters
  • precision_unit (str, default 'us'): The precision of the timestamp to be returned. Valid values are the following: - ns / nanosecond - us / microsecond - ms / millisecond - s / sec / second - m / min / minute - h / hr / hour - d / day
  • precision_interval (int, default 1): Round the timestamp to the precision_interval units. For example, precision='minute' and precision_interval=15 will round to 15-minute intervals. Note: precision_interval must be 1 when precision='nanosecond'.
  • round_to (str, default 'down'): The direction to which to round the timestamp. Available options are down, up, and closest.
  • as_pandas (bool, default False): If True, return a Pandas Timestamp. This is always true if unit is nanosecond.
  • as_int (bool, default False): If True, return the timestamp to an integer. Overrides as_pandas.
Returns
  • A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
Examples
>>> get_current_timestamp('ns')
Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
>>> get_current_timestamp('ms')
Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
def is_dtype_special(type_: str) -> bool:
1136def is_dtype_special(type_: str) -> bool:
1137    """
1138    Return whether a dtype should be treated as a special Meerschaum dtype.
1139    This is not the same as a Meerschaum alias.
1140    """
1141    true_type = MRSM_ALIAS_DTYPES.get(type_, type_)
1142    if true_type in (
1143        'uuid',
1144        'json',
1145        'bytes',
1146        'numeric',
1147        'datetime',
1148        'geometry',
1149        'geography',
1150        'date',
1151        'bool',
1152    ):
1153        return True
1154
1155    if are_dtypes_equal(true_type, 'datetime'):
1156        return True
1157
1158    if are_dtypes_equal(true_type, 'date'):
1159        return True
1160
1161    if true_type.startswith('numeric'):
1162        return True
1163
1164    if true_type.startswith('bool'):
1165        return True
1166
1167    if true_type.startswith('geometry'):
1168        return True
1169
1170    if true_type.startswith('geography'):
1171        return True
1172
1173    return False

Return whether a dtype should be treated as a special Meerschaum dtype. This is not the same as a Meerschaum alias.

def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1176def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1177    """
1178    Get the next precision string in order of value.
1179
1180    Parameters
1181    ----------
1182    precision_unit: str
1183        The precision string (`'nanosecond'`, `'ms'`, etc.).
1184
1185    decrease: bool, defaul True
1186        If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`).
1187        If `False`, return the precision unit which is higher.
1188
1189    Returns
1190    -------
1191    A `precision` string which is lower or higher than the given precision unit.
1192
1193    Examples
1194    --------
1195    >>> get_next_precision_unit('nanosecond')
1196    'microsecond'
1197    >>> get_next_precision_unit('ms')
1198    'second'
1199    >>> get_next_precision_unit('hour', decrease=False)
1200    'minute'
1201    """
1202    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1203    precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None)
1204    if not precision_scalar:
1205        raise ValueError(f"Invalid precision unit '{precision_unit}'.")
1206
1207    precisions = sorted(
1208        list(MRSM_PRECISION_UNITS_SCALARS),
1209        key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p]
1210    )
1211
1212    precision_index = precisions.index(true_precision_unit)
1213    new_precision_index = precision_index + (-1 if decrease else 1)
1214    if new_precision_index < 0 or new_precision_index >= len(precisions):
1215        raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.")
1216
1217    return precisions[new_precision_index]

Get the next precision string in order of value.

Parameters
  • precision_unit (str): The precision string ('nanosecond', 'ms', etc.).
  • decrease (bool, defaul True): If True return the precision unit which is lower (e.g. nanosecond -> millisecond). If False, return the precision unit which is higher.
Returns
  • A precision string which is lower or higher than the given precision unit.
Examples
>>> get_next_precision_unit('nanosecond')
'microsecond'
>>> get_next_precision_unit('ms')
'second'
>>> get_next_precision_unit('hour', decrease=False)
'minute'
def round_time( dt: Optional[datetime.datetime] = None, date_delta: Optional[datetime.timedelta] = None, to: str = 'down') -> datetime.datetime:
1220def round_time(
1221    dt: Optional[datetime] = None,
1222    date_delta: Optional[timedelta] = None,
1223    to: 'str' = 'down'
1224) -> datetime:
1225    """
1226    Round a datetime object to a multiple of a timedelta.
1227
1228    Parameters
1229    ----------
1230    dt: Optional[datetime], default None
1231        If `None`, grab the current UTC datetime.
1232
1233    date_delta: Optional[timedelta], default None
1234        If `None`, use a delta of 1 minute.
1235
1236    to: 'str', default 'down'
1237        Available options are `'up'`, `'down'`, and `'closest'`.
1238
1239    Returns
1240    -------
1241    A rounded `datetime` object.
1242
1243    Examples
1244    --------
1245    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
1246    datetime.datetime(2022, 1, 1, 12, 15)
1247    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
1248    datetime.datetime(2022, 1, 1, 12, 16)
1249    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
1250    datetime.datetime(2022, 1, 1, 12, 0)
1251    >>> round_time(
1252    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
1253    ...   timedelta(hours=1),
1254    ...   to = 'closest'
1255    ... )
1256    datetime.datetime(2022, 1, 1, 12, 0)
1257    >>> round_time(
1258    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
1259    ...   datetime.timedelta(hours=1),
1260    ...   to = 'closest'
1261    ... )
1262    datetime.datetime(2022, 1, 1, 13, 0)
1263
1264    """
1265    from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP
1266    if date_delta is None:
1267        date_delta = timedelta(minutes=1)
1268
1269    if dt is None:
1270        dt = datetime.now(timezone.utc).replace(tzinfo=None)
1271
1272    def get_total_microseconds(td: timedelta) -> int:
1273        return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds
1274
1275    round_to_microseconds = get_total_microseconds(date_delta)
1276    if round_to_microseconds == 0:
1277        return dt
1278
1279    dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min
1280    dt_total_microseconds = get_total_microseconds(dt_delta_from_min)
1281
1282    dt_dec = Decimal(dt_total_microseconds)
1283    round_to_dec = Decimal(round_to_microseconds)
1284
1285    div = dt_dec / round_to_dec
1286    if to == 'down':
1287        num_intervals = div.to_integral_value(rounding=ROUND_DOWN)
1288    elif to == 'up':
1289        num_intervals = div.to_integral_value(rounding=ROUND_UP)
1290    else:
1291        num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP)
1292
1293    rounded_dt_total_microseconds = num_intervals * round_to_dec
1294    adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds
1295
1296    return dt + timedelta(microseconds=adjustment_microseconds)

Round a datetime object to a multiple of a timedelta.

Parameters
  • dt (Optional[datetime], default None): If None, grab the current UTC datetime.
  • date_delta (Optional[timedelta], default None): If None, use a delta of 1 minute.
  • to ('str', default 'down'): Available options are 'up', 'down', and 'closest'.
Returns
  • A rounded datetime object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 15, 57, 200),
...   timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 45, 57, 200),
...   datetime.timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)