meerschaum.utils.dtypes

Utility functions for working with data types.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with data types.
   7"""
   8
   9import traceback
  10import json
  11import uuid
  12import time
  13import struct
  14from datetime import timezone, datetime, date, timedelta
  15from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP
  16
  17import meerschaum as mrsm
  18from meerschaum.utils.typing import Dict, Union, Any, Optional, Tuple
  19from meerschaum.utils.warnings import warn
  20from meerschaum._internal.static import STATIC_CONFIG as _STATIC_CONFIG
  21
  22MRSM_ALIAS_DTYPES: Dict[str, str] = {
  23    'decimal': 'numeric',
  24    'Decimal': 'numeric',
  25    'number': 'numeric',
  26    'jsonl': 'json',
  27    'JSON': 'json',
  28    'binary': 'bytes',
  29    'blob': 'bytes',
  30    'varbinary': 'bytes',
  31    'bytea': 'bytes',
  32    'guid': 'uuid',
  33    'UUID': 'uuid',
  34    'geom': 'geometry',
  35    'geog': 'geography',
  36    'boolean': 'bool',
  37    'day': 'date',
  38}
  39MRSM_PD_DTYPES: Dict[Union[str, None], str] = {
  40    'json': 'object',
  41    'numeric': 'object',
  42    'geometry': 'object',
  43    'geography': 'object',
  44    'uuid': 'object',
  45    'date': 'date32[day][pyarrow]',
  46    'datetime': 'datetime64[us, UTC]',
  47    'bool': 'bool[pyarrow]',
  48    'int': 'int64[pyarrow]',
  49    'int8': 'int8[pyarrow]',
  50    'int16': 'int16[pyarrow]',
  51    'int32': 'int32[pyarrow]',
  52    'int64': 'int64[pyarrow]',
  53    'str': 'string',
  54    'bytes': 'binary[pyarrow]',
  55    None: 'object',
  56}
  57
  58MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = {
  59    'nanosecond': 1_000_000_000,
  60    'microsecond': 1_000_000,
  61    'millisecond': 1000,
  62    'second': 1,
  63    'minute': (1 / 60),
  64    'hour': (1 / 3600),
  65    'day': (1 / 86400),
  66}
  67
  68MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = {
  69    'ns': 'nanosecond',
  70    'us': 'microsecond',
  71    'ms': 'millisecond',
  72    's': 'second',
  73    'sec': 'second',
  74    'm': 'minute',
  75    'min': 'minute',
  76    'h': 'hour',
  77    'hr': 'hour',
  78    'd': 'day',
  79    'D': 'day',
  80}
  81MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = {
  82    'nanosecond': 'ns',
  83    'microsecond': 'us',
  84    'millisecond': 'ms',
  85    'second': 's',
  86    'minute': 'min',
  87    'hour': 'hr',
  88    'day': 'D',
  89}
  90
  91
  92def to_pandas_dtype(dtype: str) -> str:
  93    """
  94    Cast a supported Meerschaum dtype to a Pandas dtype.
  95    """
  96    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
  97    if known_dtype is not None:
  98        return known_dtype
  99
 100    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
 101    if alias_dtype is not None:
 102        return MRSM_PD_DTYPES[alias_dtype]
 103
 104    if dtype.startswith('numeric'):
 105        return MRSM_PD_DTYPES['numeric']
 106
 107    if dtype.startswith('geometry'):
 108        return MRSM_PD_DTYPES['geometry']
 109
 110    if dtype.startswith('geography'):
 111        return MRSM_PD_DTYPES['geography']
 112
 113    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
 114    ### treat it as a SQL db type.
 115    if dtype.split(' ')[0].isupper():
 116        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
 117        return get_pd_type_from_db_type(dtype)
 118
 119    from meerschaum.utils.packages import attempt_import
 120    _ = attempt_import('pyarrow', lazy=False)
 121    pandas = attempt_import('pandas', lazy=False)
 122
 123    try:
 124        return str(pandas.api.types.pandas_dtype(dtype))
 125    except Exception:
 126        warn(
 127            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
 128            + f"{traceback.format_exc()}",
 129            stack=False,
 130        )
 131    return 'object'
 132
 133
 134def are_dtypes_equal(
 135    ldtype: Union[str, Dict[str, str]],
 136    rdtype: Union[str, Dict[str, str]],
 137) -> bool:
 138    """
 139    Determine whether two dtype strings may be considered
 140    equivalent to avoid unnecessary conversions.
 141
 142    Parameters
 143    ----------
 144    ldtype: Union[str, Dict[str, str]]
 145        The left dtype to compare.
 146        May also provide a dtypes dictionary.
 147
 148    rdtype: Union[str, Dict[str, str]]
 149        The right dtype to compare.
 150        May also provide a dtypes dictionary.
 151
 152    Returns
 153    -------
 154    A `bool` indicating whether the two dtypes are to be considered equivalent.
 155    """
 156    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
 157        lkeys = sorted([str(k) for k in ldtype.keys()])
 158        rkeys = sorted([str(k) for k in rdtype.keys()])
 159        for lkey, rkey in zip(lkeys, rkeys):
 160            if lkey != rkey:
 161                return False
 162            ltype = ldtype[lkey]
 163            rtype = rdtype[rkey]
 164            if not are_dtypes_equal(ltype, rtype):
 165                return False
 166        return True
 167
 168    try:
 169        if ldtype == rdtype:
 170            return True
 171    except Exception:
 172        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
 173        return False
 174
 175    ### Sometimes pandas dtype objects are passed.
 176    ldtype = str(ldtype).split('[', maxsplit=1)[0]
 177    rdtype = str(rdtype).split('[', maxsplit=1)[0]
 178
 179    if ldtype in MRSM_ALIAS_DTYPES:
 180        ldtype = MRSM_ALIAS_DTYPES[ldtype]
 181
 182    if rdtype in MRSM_ALIAS_DTYPES:
 183        rdtype = MRSM_ALIAS_DTYPES[rdtype]
 184
 185    json_dtypes = ('json', 'object')
 186    if ldtype in json_dtypes and rdtype in json_dtypes:
 187        return True
 188
 189    numeric_dtypes = ('numeric', 'decimal', 'object')
 190    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
 191        return True
 192
 193    uuid_dtypes = ('uuid', 'object')
 194    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
 195        return True
 196
 197    bytes_dtypes = ('bytes', 'object', 'binary')
 198    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
 199        return True
 200
 201    geometry_dtypes = ('geometry', 'object', 'geography')
 202    if ldtype in geometry_dtypes and rdtype in geometry_dtypes:
 203        return True
 204
 205    if ldtype.lower() == rdtype.lower():
 206        return True
 207
 208    datetime_dtypes = ('datetime', 'timestamp')
 209    ldtype_found_dt_prefix = False
 210    rdtype_found_dt_prefix = False
 211    for dt_prefix in datetime_dtypes:
 212        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
 213        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
 214    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
 215        return True
 216
 217    string_dtypes = ('str', 'string', 'object')
 218    if ldtype in string_dtypes and rdtype in string_dtypes:
 219        return True
 220
 221    int_dtypes = (
 222        'int', 'int64', 'int32', 'int16', 'int8',
 223        'uint', 'uint64', 'uint32', 'uint16', 'uint8',
 224    )
 225    int_substrings = ('int',)
 226    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
 227        return True
 228    for substring in int_substrings:
 229        if substring in ldtype.lower() and substring in rdtype.lower():
 230            return True
 231
 232    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
 233    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
 234        return True
 235
 236    bool_dtypes = ('bool', 'boolean')
 237    if ldtype in bool_dtypes and rdtype in bool_dtypes:
 238        return True
 239
 240    date_dtypes = (
 241        'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]',
 242        'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]',
 243    )
 244    if ldtype in date_dtypes and rdtype in date_dtypes:
 245        return True
 246
 247    return False
 248
 249
 250def is_dtype_numeric(dtype: str) -> bool:
 251    """
 252    Determine whether a given `dtype` string
 253    should be considered compatible with the Meerschaum dtype `numeric`.
 254
 255    Parameters
 256    ----------
 257    dtype: str
 258        The pandas-like dtype string.
 259
 260    Returns
 261    -------
 262    A bool indicating the dtype is compatible with `numeric`.
 263    """
 264    dtype_lower = dtype.lower()
 265
 266    acceptable_substrings = ('numeric', 'float', 'double', 'int')
 267    for substring in acceptable_substrings:
 268        if substring in dtype_lower:
 269            return True
 270
 271    return False
 272
 273
 274def attempt_cast_to_numeric(
 275    value: Any,
 276    quantize: bool = False,
 277    precision: Optional[int] = None,
 278    scale: Optional[int] = None,
 279)-> Any:
 280    """
 281    Given a value, attempt to coerce it into a numeric (Decimal).
 282
 283    Parameters
 284    ----------
 285    value: Any
 286        The value to be cast to a Decimal.
 287
 288    quantize: bool, default False
 289        If `True`, quantize the decimal to the specified precision and scale.
 290
 291    precision: Optional[int], default None
 292        If `quantize` is `True`, use this precision.
 293
 294    scale: Optional[int], default None
 295        If `quantize` is `True`, use this scale.
 296
 297    Returns
 298    -------
 299    A `Decimal` if possible, or `value`.
 300    """
 301    if isinstance(value, Decimal):
 302        if quantize and precision and scale:
 303            return quantize_decimal(value, precision, scale)
 304        return value
 305    try:
 306        if value_is_null(value):
 307            return Decimal('NaN')
 308
 309        dec = Decimal(str(value))
 310        if not quantize or not precision or not scale:
 311            return dec
 312        return quantize_decimal(dec, precision, scale)
 313    except Exception:
 314        return value
 315
 316
 317def attempt_cast_to_uuid(value: Any) -> Any:
 318    """
 319    Given a value, attempt to coerce it into a UUID (`uuid4`).
 320    """
 321    if isinstance(value, uuid.UUID):
 322        return value
 323    try:
 324        return (
 325            uuid.UUID(str(value))
 326            if not value_is_null(value)
 327            else None
 328        )
 329    except Exception:
 330        return value
 331
 332
 333def attempt_cast_to_bytes(value: Any) -> Any:
 334    """
 335    Given a value, attempt to coerce it into a bytestring.
 336    """
 337    if isinstance(value, bytes):
 338        return value
 339    try:
 340        return (
 341            deserialize_bytes_string(str(value))
 342            if not value_is_null(value)
 343            else None
 344        )
 345    except Exception:
 346        return value
 347
 348
 349def attempt_cast_to_geometry(value: Any) -> Any:
 350    """
 351    Given a value, attempt to coerce it into a `shapely` (`geometry`) object.
 352    """
 353    typ = str(type(value))
 354    if 'pandas' in typ and 'Series' in typ:
 355        if 'GeoSeries' in typ:
 356            return value
 357
 358        gpd = mrsm.attempt_import('geopandas', lazy=False)
 359        if len(value) == 0:
 360            return gpd.GeoSeries([])
 361
 362        ix = value.first_valid_index()
 363        if ix is None:
 364            try:
 365                return gpd.GeoSeries(value)
 366            except Exception:
 367                traceback.print_exc()
 368                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
 369
 370        sample_val = value[ix]
 371        sample_typ = str(type(sample_val))
 372        if 'shapely' in sample_typ:
 373            try:
 374                return gpd.GeoSeries(value)
 375            except Exception:
 376                traceback.print_exc()
 377                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
 378
 379        sample_is_gpkg = geometry_is_gpkg(sample_val)
 380        if sample_is_gpkg:
 381            try:
 382                value = value.apply(lambda x: gpkg_wkb_to_wkb(x)[0])
 383            except Exception:
 384                traceback.print_exc()
 385                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
 386
 387        sample_is_wkt = geometry_is_wkt(sample_val) if not sample_is_gpkg else False
 388        try:
 389            return (
 390                gpd.GeoSeries.from_wkt(value)
 391                if sample_is_wkt
 392                else gpd.GeoSeries.from_wkb(value)
 393            )
 394        except Exception:
 395            traceback.print_exc()
 396            return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
 397
 398    if 'shapely' in typ:
 399        return value
 400
 401    shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import(
 402        'shapely',
 403        'shapely.wkt',
 404        'shapely.wkb',
 405        lazy=False,
 406    )
 407
 408    if isinstance(value, (dict, list)):
 409        try:
 410            return shapely.from_geojson(json.dumps(value))
 411        except Exception:
 412            return value
 413
 414    value_is_gpkg = geometry_is_gpkg(value)
 415    if value_is_gpkg:
 416        try:
 417            wkb_data, _, _ = gpkg_wkb_to_wkb(value)
 418            return shapely_wkb.loads(wkb_data)
 419        except Exception:
 420            return value
 421
 422    value_is_wkt = geometry_is_wkt(value)
 423    if value_is_wkt is None:
 424        return value
 425
 426    try:
 427        return (
 428            shapely_wkt.loads(value)
 429            if value_is_wkt
 430            else shapely_wkb.loads(value)
 431        )
 432    except Exception:
 433        pass
 434
 435    return value
 436
 437
 438def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]:
 439    """
 440    Determine whether an input value should be treated as WKT or WKB geometry data.
 441
 442    Parameters
 443    ----------
 444    value: Union[str, bytes]
 445        The input data to be parsed into geometry data.
 446
 447    Returns
 448    -------
 449    A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB).
 450    Return `None` if `value` should be parsed as neither.
 451    """
 452    import re
 453    if not isinstance(value, (str, bytes)):
 454        return None
 455
 456    if isinstance(value, bytes):
 457        return False
 458    
 459    wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$'
 460    if re.match(wkt_pattern, value, re.IGNORECASE):
 461        return True
 462    
 463    if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0:
 464        return False
 465    
 466    return None
 467
 468
 469def geometry_is_gpkg(value: bytes) -> bool:
 470    """
 471    Return whether the input `value` is formatted as GeoPackage WKB.
 472    """
 473    if not isinstance(value, bytes) or len(value) < 2:
 474        return False
 475
 476    return value[0:2] == b'GP'
 477
 478def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]:
 479    """
 480    Converts GeoPackage WKB to standard WKB by removing the header.
 481    
 482    Parameters
 483    ----------
 484    gpkg_wkb_bytes: bytes
 485        The GeoPackage WKB byte string.
 486        
 487    Returns
 488    -------
 489    A tuple containing the standard WKB bytes, SRID, and flags.
 490    """
 491    magic_number = gpkg_wkb_bytes[0:2]
 492    if magic_number != b'GP':
 493        raise ValueError("Invalid GeoPackage WKB header: missing magic number.")
 494    
 495    try:
 496        header = gpkg_wkb_bytes[0:8]
 497        header_vals = struct.unpack('<ccBBi', header)
 498        flags = header_vals[-2]
 499        srid = header_vals[-1]
 500    except struct.error:
 501        header = gpkg_wkb_bytes[0:6]
 502        header_vals = struct.unpack('<ccBBh', header)
 503        flags = header_vals[-2]
 504        srid = header_vals[-1]
 505
 506    envelope_type = (flags >> 1) & 0x07
 507    envelope_sizes = {
 508        0: 0,
 509        1: 32,
 510        2: 48,
 511        3: 48,
 512        4: 64,
 513    }
 514    header_length = 8 + envelope_sizes.get(envelope_type, 0)
 515    standard_wkb_bytes = gpkg_wkb_bytes[header_length:]
 516    return standard_wkb_bytes, srid, flags
 517
 518
 519def value_is_null(value: Any) -> bool:
 520    """
 521    Determine if a value is a null-like string.
 522    """
 523    return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')
 524
 525
 526def none_if_null(value: Any) -> Any:
 527    """
 528    Return `None` if a value is a null-like string.
 529    """
 530    return (None if value_is_null(value) else value)
 531
 532
 533def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
 534    """
 535    Quantize a given `Decimal` to a known scale and precision.
 536
 537    Parameters
 538    ----------
 539    x: Decimal
 540        The `Decimal` to be quantized.
 541
 542    precision: int
 543        The total number of significant digits.
 544
 545    scale: int
 546        The number of significant digits after the decimal point.
 547
 548    Returns
 549    -------
 550    A `Decimal` quantized to the specified scale and precision.
 551    """
 552    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
 553    try:
 554        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
 555    except InvalidOperation:
 556        pass
 557
 558    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
 559
 560
 561def serialize_decimal(
 562    x: Any,
 563    quantize: bool = False,
 564    precision: Optional[int] = None,
 565    scale: Optional[int] = None,
 566) -> Any:
 567    """
 568    Return a quantized string of an input decimal.
 569
 570    Parameters
 571    ----------
 572    x: Any
 573        The potential decimal to be serialized.
 574
 575    quantize: bool, default False
 576        If `True`, quantize the incoming Decimal to the specified scale and precision
 577        before serialization.
 578
 579    precision: Optional[int], default None
 580        The precision of the decimal to be quantized.
 581
 582    scale: Optional[int], default None
 583        The scale of the decimal to be quantized.
 584
 585    Returns
 586    -------
 587    A string of the input decimal or the input if not a Decimal.
 588    """
 589    if not isinstance(x, Decimal):
 590        return x
 591
 592    if value_is_null(x):
 593        return None
 594
 595    if quantize and scale and precision:
 596        x = quantize_decimal(x, precision, scale)
 597
 598    return f"{x:f}"
 599
 600
 601def coerce_timezone(
 602    dt: Any,
 603    strip_utc: bool = False,
 604) -> Any:
 605    """
 606    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
 607    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
 608    """
 609    if dt is None:
 610        return None
 611
 612    if isinstance(dt, int):
 613        return dt
 614
 615    if isinstance(dt, str):
 616        dateutil_parser = mrsm.attempt_import('dateutil.parser')
 617        try:
 618            dt = dateutil_parser.parse(dt)
 619        except Exception:
 620            return dt
 621
 622    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
 623    if dt_is_series:
 624        pandas = mrsm.attempt_import('pandas', lazy=False)
 625
 626        if (
 627            pandas.api.types.is_datetime64_any_dtype(dt) and (
 628                (dt.dt.tz is not None and not strip_utc)
 629                or
 630                (dt.dt.tz is None and strip_utc)
 631            )
 632        ):
 633            return dt
 634
 635        dt_series = to_datetime(dt, coerce_utc=False)
 636        if dt_series.dt.tz is None:
 637            dt_series = dt_series.dt.tz_localize(timezone.utc)
 638        if strip_utc:
 639            try:
 640                if dt_series.dt.tz is not None:
 641                    dt_series = dt_series.dt.tz_localize(None)
 642            except Exception:
 643                pass
 644
 645        return dt_series
 646
 647    if dt.tzinfo is None:
 648        if strip_utc:
 649            return dt
 650        return dt.replace(tzinfo=timezone.utc)
 651
 652    utc_dt = dt.astimezone(timezone.utc)
 653    if strip_utc:
 654        return utc_dt.replace(tzinfo=None)
 655    return utc_dt
 656
 657
 658def to_datetime(
 659    dt_val: Any,
 660    as_pydatetime: bool = False,
 661    coerce_utc: bool = True,
 662    precision_unit: Optional[str] = None,
 663) -> Any:
 664    """
 665    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
 666
 667    Parameters
 668    ----------
 669    dt_val: Any
 670        The value to coerce to Pandas Timestamps.
 671
 672    as_pydatetime: bool, default False
 673        If `True`, return a Python datetime object.
 674
 675    coerce_utc: bool, default True
 676        If `True`, ensure the value has UTC tzinfo.
 677
 678    precision_unit: Optional[str], default None
 679        If provided, enforce the provided precision unit.
 680    """
 681    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
 682    is_dask = 'dask' in getattr(dt_val, '__module__', '')
 683    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
 684    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
 685    pd = pandas if dd is None else dd
 686    enforce_precision = precision_unit is not None
 687    precision_unit = precision_unit or 'microsecond'
 688    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
 689    precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None)
 690    if not precision_abbreviation:
 691        raise ValueError(f"Invalid precision '{precision_unit}'.")
 692
 693    def parse(x: Any) -> Any:
 694        try:
 695            return dateutil_parser.parse(x)
 696        except Exception:
 697            return x
 698
 699    def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool:
 700        dtype_check_against = (
 701            f"datetime64[{precision_abbreviation}, UTC]"
 702            if with_utc
 703            else f"datetime64[{precision_abbreviation}]"
 704        )
 705        return (
 706            dtype_to_check == dtype_check_against
 707            if enforce_precision
 708            else (
 709                dtype_to_check.startswith('datetime64[')
 710                and (
 711                    ('utc' in dtype_to_check.lower())
 712                    if with_utc
 713                    else ('utc' not in dtype_to_check.lower())
 714                )
 715            )
 716        )
 717
 718    if isinstance(dt_val, pd.Timestamp):
 719        dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime()
 720        return (
 721            coerce_timezone(dt_val_to_return)
 722            if coerce_utc
 723            else dt_val_to_return
 724        )
 725
 726    if dt_is_series:
 727        changed_tz = False
 728        original_tz = None
 729        dtype = str(getattr(dt_val, 'dtype', 'object'))
 730        if (
 731            are_dtypes_equal(dtype, 'datetime')
 732            and 'utc' not in dtype.lower()
 733            and hasattr(dt_val, 'dt')
 734        ):
 735            original_tz = dt_val.dt.tz
 736            dt_val = dt_val.dt.tz_localize(timezone.utc)
 737            changed_tz = True
 738            dtype = str(getattr(dt_val, 'dtype', 'object'))
 739        try:
 740            new_dt_series = (
 741                dt_val
 742                if check_dtype(dtype, with_utc=True)
 743                else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]")
 744            )
 745        except pd.errors.OutOfBoundsDatetime:
 746            try:
 747                next_precision = get_next_precision_unit(true_precision_unit)
 748                next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision]
 749                new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]")
 750            except Exception:
 751                new_dt_series = None
 752        except ValueError:
 753            new_dt_series = None
 754        except TypeError:
 755            try:
 756                new_dt_series = (
 757                    new_dt_series
 758                    if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False)
 759                    else dt_val.astype(f"datetime64[{precision_abbreviation}]")
 760                )
 761            except Exception:
 762                new_dt_series = None
 763
 764        if new_dt_series is None:
 765            new_dt_series = dt_val.apply(lambda x: parse(str(x)))
 766
 767        if coerce_utc:
 768            return coerce_timezone(new_dt_series)
 769
 770        if changed_tz:
 771            new_dt_series = new_dt_series.dt.tz_localize(original_tz)
 772        return new_dt_series
 773
 774    try:
 775        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
 776        if new_dt_val.unit != precision_abbreviation:
 777            new_dt_val = new_dt_val.as_unit(precision_abbreviation)
 778        if as_pydatetime:
 779            return new_dt_val.to_pydatetime()
 780        return new_dt_val
 781    except (pd.errors.OutOfBoundsDatetime, ValueError):
 782        pass
 783
 784    new_dt_val = parse(dt_val)
 785    if not coerce_utc:
 786        return new_dt_val
 787    return coerce_timezone(new_dt_val)
 788
 789
 790def serialize_bytes(data: bytes) -> str:
 791    """
 792    Return the given bytes as a base64-encoded string.
 793    """
 794    import base64
 795    if not isinstance(data, bytes) and value_is_null(data):
 796        return data
 797    return base64.b64encode(data).decode('utf-8')
 798
 799
 800def serialize_geometry(
 801    geom: Any,
 802    geometry_format: str = 'wkb_hex',
 803    srid: Optional[int] = None,
 804) -> Union[str, Dict[str, Any], bytes, None]:
 805    """
 806    Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 
 807
 808    Parameters
 809    ----------
 810    geom: Any
 811        The potential geometry data to be serialized.
 812
 813    geometry_format: str, default 'wkb_hex'
 814        The serialization format for geometry data.
 815        Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`.
 816
 817    srid: Optional[int], default None
 818        If provided, use this as the source CRS when serializing to GeoJSON.
 819
 820    Returns
 821    -------
 822    A string containing the geometry data, or bytes, or a dictionary, or None.
 823    """
 824    if value_is_null(geom):
 825        return None
 826
 827    shapely, shapely_ops, pyproj, np = mrsm.attempt_import(
 828        'shapely', 'shapely.ops', 'pyproj', 'numpy',
 829        lazy=False,
 830    )
 831    if geometry_format == 'geojson':
 832        if srid:
 833            transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True)
 834            geom = shapely_ops.transform(transformer.transform, geom)
 835        geojson_str = shapely.to_geojson(geom)
 836        return json.loads(geojson_str)
 837
 838    if not hasattr(geom, 'wkb_hex'):
 839        return str(geom)
 840
 841    byte_order = 1 if np.little_endian else 0
 842
 843    if geometry_format.startswith("wkb"):
 844        return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True)
 845    elif geometry_format == 'gpkg_wkb':
 846        wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order)
 847        flags = (
 848            ((byte_order & 0x01) | (0x20))
 849            if geom.is_empty
 850            else (byte_order & 0x01)
 851        )
 852        srid_val = srid or -1
 853        header = struct.pack(
 854            '<ccBBi',
 855            b'G', b'P',
 856            0,
 857            flags,
 858            srid_val
 859        )
 860        return header + wkb_data
 861
 862    return shapely.to_wkt(geom)
 863
 864
 865def deserialize_geometry(geom_wkb: Union[str, bytes]):
 866    """
 867    Deserialize a WKB string into a shapely geometry object.
 868    """
 869    shapely = mrsm.attempt_import('shapely', lazy=False)
 870    return shapely.wkb.loads(geom_wkb)
 871
 872
 873def project_geometry(geom, srid: int, to_srid: int = 4326):
 874    """
 875    Project a shapely geometry object to a new CRS (SRID).
 876    """
 877    pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False)
 878    transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True)
 879    return shapely_ops.transform(transformer.transform, geom)
 880
 881
 882def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]:
 883    """
 884    Given a serialized ASCII string of bytes data, return the original bytes.
 885    The input data may either be base64- or hex-encoded.
 886
 887    Parameters
 888    ----------
 889    data: Optional[str]
 890        The string to be deserialized into bytes.
 891        May be base64- or hex-encoded (prefixed with `'\\x'`).
 892
 893    force_hex: bool = False
 894        If `True`, treat the input string as hex-encoded.
 895        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
 896        This will still strip the leading `'\\x'` prefix if present.
 897
 898    Returns
 899    -------
 900    The original bytes used to produce the encoded string `data`.
 901    """
 902    if not isinstance(data, str) and value_is_null(data):
 903        return data
 904
 905    import binascii
 906    import base64
 907
 908    is_hex = force_hex or data.startswith('\\x')
 909
 910    if is_hex:
 911        if data.startswith('\\x'):
 912            data = data[2:]
 913        return binascii.unhexlify(data)
 914
 915    return base64.b64decode(data)
 916
 917
 918def deserialize_base64(data: str) -> bytes:
 919    """
 920    Return the original bytestring from the given base64-encoded string.
 921    """
 922    import base64
 923    return base64.b64decode(data)
 924
 925
 926def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]:
 927    """
 928    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
 929    """
 930    import binascii
 931    if not isinstance(data, bytes) and value_is_null(data):
 932        return data
 933    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
 934
 935
 936def serialize_datetime(dt: datetime) -> Union[str, None]:
 937    """
 938    Serialize a datetime object into JSON (ISO format string).
 939
 940    Examples
 941    --------
 942    >>> import json
 943    >>> from datetime import datetime
 944    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
 945    '{"a": "2022-01-01T00:00:00Z"}'
 946
 947    """
 948    if not hasattr(dt, 'isoformat'):
 949        return None
 950
 951    tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else ''
 952    return dt.isoformat() + tz_suffix
 953
 954
 955def serialize_date(d: date) -> Union[str, None]:
 956    """
 957    Serialize a date object into its ISO representation.
 958    """
 959    return d.isoformat() if hasattr(d, 'isoformat') else None
 960
 961
 962def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]:
 963    """
 964    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
 965
 966    Parameters
 967    ----------
 968    x: Any
 969        The value to serialize.
 970
 971    default_to_str: bool, default True
 972        If `True`, return a string of `x` if x is not a designated type.
 973        Otherwise return x.
 974
 975    Returns
 976    -------
 977    A serialized version of x, or x.
 978    """
 979    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
 980        return x.meta
 981
 982    if hasattr(x, 'tzinfo'):
 983        return serialize_datetime(x)
 984
 985    if hasattr(x, 'isoformat'):
 986        return serialize_date(x)
 987
 988    if isinstance(x, bytes):
 989        return serialize_bytes(x)
 990
 991    if isinstance(x, Decimal):
 992        return serialize_decimal(x)
 993
 994    if 'shapely' in str(type(x)):
 995        return serialize_geometry(x)
 996
 997    if value_is_null(x):
 998        return None
 999
1000    if isinstance(x, (dict, list, tuple)):
1001        return json.dumps(x, default=json_serialize_value, separators=(',', ':'))
1002
1003    return str(x) if default_to_str else x
1004
1005
1006def get_geometry_type_srid(
1007    dtype: str = 'geometry',
1008    default_type: str = 'geometry',
1009    default_srid: Union[int, str] = 0,
1010) -> Tuple[str, Union[int, str, None]]:
1011    """
1012    Given the specified geometry `dtype`, return a tuple in the form (type, SRID).
1013
1014    Parameters
1015    ----------
1016    dtype: Optional[str], default None
1017        Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`).
1018        You may specify a supported `shapely` geometry type and an SRID in the dtype modifier:
1019
1020        - `Point`
1021        - `LineString`
1022        - `LinearRing`
1023        - `Polygon`
1024        - `MultiPoint`
1025        - `MultiLineString`
1026        - `MultiPolygon`
1027        - `GeometryCollection`
1028
1029    Returns
1030    -------
1031    A tuple in the form (type, SRID).
1032    Defaults to `(default_type, default_srid)`.
1033
1034    Examples
1035    --------
1036    >>> from meerschaum.utils.dtypes import get_geometry_type_srid
1037    >>> get_geometry_type_srid()
1038    ('geometry', 4326)
1039    >>> get_geometry_type_srid('geometry[]')
1040    ('geometry', 4326)
1041    >>> get_geometry_type_srid('geometry[Point, 0]')
1042    ('Point', 0)
1043    >>> get_geometry_type_srid('geometry[0, Point]')
1044    ('Point', 0)
1045    >>> get_geometry_type_srid('geometry[0]')
1046    ('geometry', 0)
1047    >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
1048    ('MultiLineString', 4326)
1049    >>> get_geometry_type_srid('geography')
1050    ('geometry', 0)
1051    >>> get_geometry_type_srid('geography[POINT]')
1052    ('Point', 0)
1053    >>> get_geometry_type_srid('geometry[POINT, ESRI:102003]')
1054    ('Point', 'ESRI:102003')
1055    """
1056    from meerschaum.utils.misc import is_int
1057    ### NOTE: PostGIS syntax must also be parsed.
1058    dtype = dtype.replace('(', '[').replace(')', ']')
1059    bare_dtype = dtype.split('[', maxsplit=1)[0]
1060    modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']')
1061    if not modifier:
1062        return default_type, default_srid
1063
1064    parts = [
1065        part.split('=')[-1].strip()
1066        for part in modifier.split(',')
1067    ]
1068    parts_casted = [
1069        (
1070            int(part)
1071            if is_int(part)
1072            else part
1073        )
1074        for part in parts
1075    ]
1076
1077    srid = default_srid
1078    geometry_type = default_type
1079
1080    for part in parts_casted:
1081        if isinstance(part, int) or ':' in str(part):
1082            srid = part
1083            break
1084
1085    for part in parts_casted:
1086        if isinstance(part, str) and part != srid:
1087            geometry_type = part
1088            break
1089
1090    return geometry_type, srid
1091
1092
1093def get_current_timestamp(
1094    precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'],
1095    precision_interval: int = 1,
1096    round_to: str = 'down',
1097    as_pandas: bool = False,
1098    as_int: bool = False,
1099    _now: Union[datetime, int, None] = None,
1100) -> 'Union[datetime, pd.Timestamp, int]':
1101    """
1102    Return the current UTC timestamp to nanosecond precision.
1103
1104    Parameters
1105    ----------
1106    precision_unit: str, default 'us'
1107        The precision of the timestamp to be returned.
1108        Valid values are the following:
1109            - `ns` / `nanosecond`
1110            - `us` / `microsecond`
1111            - `ms` / `millisecond`
1112            - `s` / `sec` / `second`
1113            - `m` / `min` / `minute`
1114            - `h` / `hr` / `hour`
1115            - `d` / `day`
1116
1117    precision_interval: int, default 1
1118        Round the timestamp to the `precision_interval` units.
1119        For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals.
1120        Note: `precision_interval` must be 1 when `precision='nanosecond'`.
1121
1122    round_to: str, default 'down'
1123        The direction to which to round the timestamp.
1124        Available options are `down`, `up`, and `closest`.
1125
1126    as_pandas: bool, default False
1127        If `True`, return a Pandas Timestamp.
1128        This is always true if `unit` is `nanosecond`.
1129
1130    as_int: bool, default False
1131        If `True`, return the timestamp to an integer.
1132        Overrides `as_pandas`.
1133
1134    Returns
1135    -------
1136    A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
1137
1138    Examples
1139    --------
1140    >>> get_current_timestamp('ns')
1141    Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
1142    >>> get_current_timestamp('ms')
1143    Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1144    """
1145    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1146    if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS:
1147        from meerschaum.utils.misc import items_str
1148        raise ValueError(
1149            f"Unknown precision unit '{precision_unit}'. "
1150            "Accepted values are "
1151            f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}."
1152        )
1153
1154    if not as_int:
1155        as_pandas = as_pandas or true_precision_unit == 'nanosecond'
1156    pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None
1157
1158    if true_precision_unit == 'nanosecond':
1159        if precision_interval != 1:
1160            warn("`precision_interval` must be 1 for nanosecond precision.")
1161        now_ts = time.time_ns() if not isinstance(_now, int) else _now
1162        if as_int:
1163            return now_ts
1164        return pd.to_datetime(now_ts, unit='ns', utc=True)
1165
1166    now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now
1167    delta = timedelta(**{true_precision_unit + 's': precision_interval})
1168    rounded_now = round_time(now, delta, to=round_to)
1169
1170    if as_int:
1171        return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit])
1172
1173    ts_val = (
1174        pd.to_datetime(rounded_now, utc=True)
1175        if as_pandas
1176        else rounded_now
1177    )
1178
1179    if not as_pandas:
1180        return ts_val
1181
1182    as_unit_precisions = ('microsecond', 'millisecond', 'second')
1183    if true_precision_unit not in as_unit_precisions:
1184        return ts_val
1185
1186    return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])
1187
1188
1189def is_dtype_special(type_: str) -> bool:
1190    """
1191    Return whether a dtype should be treated as a special Meerschaum dtype.
1192    This is not the same as a Meerschaum alias.
1193    """
1194    true_type = MRSM_ALIAS_DTYPES.get(type_, type_)
1195    if true_type in (
1196        'uuid',
1197        'json',
1198        'bytes',
1199        'numeric',
1200        'datetime',
1201        'geometry',
1202        'geography',
1203        'date',
1204        'bool',
1205    ):
1206        return True
1207
1208    if are_dtypes_equal(true_type, 'datetime'):
1209        return True
1210
1211    if are_dtypes_equal(true_type, 'date'):
1212        return True
1213
1214    if true_type.startswith('numeric'):
1215        return True
1216
1217    if true_type.startswith('bool'):
1218        return True
1219
1220    if true_type.startswith('geometry'):
1221        return True
1222
1223    if true_type.startswith('geography'):
1224        return True
1225
1226    return False
1227
1228
1229def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1230    """
1231    Get the next precision string in order of value.
1232
1233    Parameters
1234    ----------
1235    precision_unit: str
1236        The precision string (`'nanosecond'`, `'ms'`, etc.).
1237
1238    decrease: bool, defaul True
1239        If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`).
1240        If `False`, return the precision unit which is higher.
1241
1242    Returns
1243    -------
1244    A `precision` string which is lower or higher than the given precision unit.
1245
1246    Examples
1247    --------
1248    >>> get_next_precision_unit('nanosecond')
1249    'microsecond'
1250    >>> get_next_precision_unit('ms')
1251    'second'
1252    >>> get_next_precision_unit('hour', decrease=False)
1253    'minute'
1254    """
1255    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1256    precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None)
1257    if not precision_scalar:
1258        raise ValueError(f"Invalid precision unit '{precision_unit}'.")
1259
1260    precisions = sorted(
1261        list(MRSM_PRECISION_UNITS_SCALARS),
1262        key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p]
1263    )
1264
1265    precision_index = precisions.index(true_precision_unit)
1266    new_precision_index = precision_index + (-1 if decrease else 1)
1267    if new_precision_index < 0 or new_precision_index >= len(precisions):
1268        raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.")
1269
1270    return precisions[new_precision_index]
1271
1272
1273def round_time(
1274    dt: Optional[datetime] = None,
1275    date_delta: Optional[timedelta] = None,
1276    to: 'str' = 'down'
1277) -> datetime:
1278    """
1279    Round a datetime object to a multiple of a timedelta.
1280
1281    Parameters
1282    ----------
1283    dt: Optional[datetime], default None
1284        If `None`, grab the current UTC datetime.
1285
1286    date_delta: Optional[timedelta], default None
1287        If `None`, use a delta of 1 minute.
1288
1289    to: 'str', default 'down'
1290        Available options are `'up'`, `'down'`, and `'closest'`.
1291
1292    Returns
1293    -------
1294    A rounded `datetime` object.
1295
1296    Examples
1297    --------
1298    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
1299    datetime.datetime(2022, 1, 1, 12, 15)
1300    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
1301    datetime.datetime(2022, 1, 1, 12, 16)
1302    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
1303    datetime.datetime(2022, 1, 1, 12, 0)
1304    >>> round_time(
1305    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
1306    ...   timedelta(hours=1),
1307    ...   to = 'closest'
1308    ... )
1309    datetime.datetime(2022, 1, 1, 12, 0)
1310    >>> round_time(
1311    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
1312    ...   datetime.timedelta(hours=1),
1313    ...   to = 'closest'
1314    ... )
1315    datetime.datetime(2022, 1, 1, 13, 0)
1316
1317    """
1318    from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP
1319    if date_delta is None:
1320        date_delta = timedelta(minutes=1)
1321
1322    if dt is None:
1323        dt = datetime.now(timezone.utc).replace(tzinfo=None)
1324
1325    def get_total_microseconds(td: timedelta) -> int:
1326        return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds
1327
1328    round_to_microseconds = get_total_microseconds(date_delta)
1329    if round_to_microseconds == 0:
1330        return dt
1331
1332    dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min
1333    dt_total_microseconds = get_total_microseconds(dt_delta_from_min)
1334
1335    dt_dec = Decimal(dt_total_microseconds)
1336    round_to_dec = Decimal(round_to_microseconds)
1337
1338    div = dt_dec / round_to_dec
1339    if to == 'down':
1340        num_intervals = div.to_integral_value(rounding=ROUND_DOWN)
1341    elif to == 'up':
1342        num_intervals = div.to_integral_value(rounding=ROUND_UP)
1343    else:
1344        num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP)
1345
1346    rounded_dt_total_microseconds = num_intervals * round_to_dec
1347    adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds
1348
1349    return dt + timedelta(microseconds=adjustment_microseconds)
MRSM_ALIAS_DTYPES: Dict[str, str] = {'decimal': 'numeric', 'Decimal': 'numeric', 'number': 'numeric', 'jsonl': 'json', 'JSON': 'json', 'binary': 'bytes', 'blob': 'bytes', 'varbinary': 'bytes', 'bytea': 'bytes', 'guid': 'uuid', 'UUID': 'uuid', 'geom': 'geometry', 'geog': 'geography', 'boolean': 'bool', 'day': 'date'}
MRSM_PD_DTYPES: Dict[Optional[str], str] = {'json': 'object', 'numeric': 'object', 'geometry': 'object', 'geography': 'object', 'uuid': 'object', 'date': 'date32[day][pyarrow]', 'datetime': 'datetime64[us, UTC]', 'bool': 'bool[pyarrow]', 'int': 'int64[pyarrow]', 'int8': 'int8[pyarrow]', 'int16': 'int16[pyarrow]', 'int32': 'int32[pyarrow]', 'int64': 'int64[pyarrow]', 'str': 'string', 'bytes': 'binary[pyarrow]', None: 'object'}
MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = {'nanosecond': 1000000000, 'microsecond': 1000000, 'millisecond': 1000, 'second': 1, 'minute': 0.016666666666666666, 'hour': 0.0002777777777777778, 'day': 1.1574074074074073e-05}
MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = {'ns': 'nanosecond', 'us': 'microsecond', 'ms': 'millisecond', 's': 'second', 'sec': 'second', 'm': 'minute', 'min': 'minute', 'h': 'hour', 'hr': 'hour', 'd': 'day', 'D': 'day'}
MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = {'nanosecond': 'ns', 'microsecond': 'us', 'millisecond': 'ms', 'second': 's', 'minute': 'min', 'hour': 'hr', 'day': 'D'}
def to_pandas_dtype(dtype: str) -> str:
 93def to_pandas_dtype(dtype: str) -> str:
 94    """
 95    Cast a supported Meerschaum dtype to a Pandas dtype.
 96    """
 97    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
 98    if known_dtype is not None:
 99        return known_dtype
100
101    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
102    if alias_dtype is not None:
103        return MRSM_PD_DTYPES[alias_dtype]
104
105    if dtype.startswith('numeric'):
106        return MRSM_PD_DTYPES['numeric']
107
108    if dtype.startswith('geometry'):
109        return MRSM_PD_DTYPES['geometry']
110
111    if dtype.startswith('geography'):
112        return MRSM_PD_DTYPES['geography']
113
114    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
115    ### treat it as a SQL db type.
116    if dtype.split(' ')[0].isupper():
117        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
118        return get_pd_type_from_db_type(dtype)
119
120    from meerschaum.utils.packages import attempt_import
121    _ = attempt_import('pyarrow', lazy=False)
122    pandas = attempt_import('pandas', lazy=False)
123
124    try:
125        return str(pandas.api.types.pandas_dtype(dtype))
126    except Exception:
127        warn(
128            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
129            + f"{traceback.format_exc()}",
130            stack=False,
131        )
132    return 'object'

Cast a supported Meerschaum dtype to a Pandas dtype.

def are_dtypes_equal( ldtype: Union[str, Dict[str, str]], rdtype: Union[str, Dict[str, str]]) -> bool:
135def are_dtypes_equal(
136    ldtype: Union[str, Dict[str, str]],
137    rdtype: Union[str, Dict[str, str]],
138) -> bool:
139    """
140    Determine whether two dtype strings may be considered
141    equivalent to avoid unnecessary conversions.
142
143    Parameters
144    ----------
145    ldtype: Union[str, Dict[str, str]]
146        The left dtype to compare.
147        May also provide a dtypes dictionary.
148
149    rdtype: Union[str, Dict[str, str]]
150        The right dtype to compare.
151        May also provide a dtypes dictionary.
152
153    Returns
154    -------
155    A `bool` indicating whether the two dtypes are to be considered equivalent.
156    """
157    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
158        lkeys = sorted([str(k) for k in ldtype.keys()])
159        rkeys = sorted([str(k) for k in rdtype.keys()])
160        for lkey, rkey in zip(lkeys, rkeys):
161            if lkey != rkey:
162                return False
163            ltype = ldtype[lkey]
164            rtype = rdtype[rkey]
165            if not are_dtypes_equal(ltype, rtype):
166                return False
167        return True
168
169    try:
170        if ldtype == rdtype:
171            return True
172    except Exception:
173        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
174        return False
175
176    ### Sometimes pandas dtype objects are passed.
177    ldtype = str(ldtype).split('[', maxsplit=1)[0]
178    rdtype = str(rdtype).split('[', maxsplit=1)[0]
179
180    if ldtype in MRSM_ALIAS_DTYPES:
181        ldtype = MRSM_ALIAS_DTYPES[ldtype]
182
183    if rdtype in MRSM_ALIAS_DTYPES:
184        rdtype = MRSM_ALIAS_DTYPES[rdtype]
185
186    json_dtypes = ('json', 'object')
187    if ldtype in json_dtypes and rdtype in json_dtypes:
188        return True
189
190    numeric_dtypes = ('numeric', 'decimal', 'object')
191    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
192        return True
193
194    uuid_dtypes = ('uuid', 'object')
195    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
196        return True
197
198    bytes_dtypes = ('bytes', 'object', 'binary')
199    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
200        return True
201
202    geometry_dtypes = ('geometry', 'object', 'geography')
203    if ldtype in geometry_dtypes and rdtype in geometry_dtypes:
204        return True
205
206    if ldtype.lower() == rdtype.lower():
207        return True
208
209    datetime_dtypes = ('datetime', 'timestamp')
210    ldtype_found_dt_prefix = False
211    rdtype_found_dt_prefix = False
212    for dt_prefix in datetime_dtypes:
213        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
214        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
215    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
216        return True
217
218    string_dtypes = ('str', 'string', 'object')
219    if ldtype in string_dtypes and rdtype in string_dtypes:
220        return True
221
222    int_dtypes = (
223        'int', 'int64', 'int32', 'int16', 'int8',
224        'uint', 'uint64', 'uint32', 'uint16', 'uint8',
225    )
226    int_substrings = ('int',)
227    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
228        return True
229    for substring in int_substrings:
230        if substring in ldtype.lower() and substring in rdtype.lower():
231            return True
232
233    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
234    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
235        return True
236
237    bool_dtypes = ('bool', 'boolean')
238    if ldtype in bool_dtypes and rdtype in bool_dtypes:
239        return True
240
241    date_dtypes = (
242        'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]',
243        'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]',
244    )
245    if ldtype in date_dtypes and rdtype in date_dtypes:
246        return True
247
248    return False

Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.

Parameters
  • ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
  • rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
  • A bool indicating whether the two dtypes are to be considered equivalent.
def is_dtype_numeric(dtype: str) -> bool:
251def is_dtype_numeric(dtype: str) -> bool:
252    """
253    Determine whether a given `dtype` string
254    should be considered compatible with the Meerschaum dtype `numeric`.
255
256    Parameters
257    ----------
258    dtype: str
259        The pandas-like dtype string.
260
261    Returns
262    -------
263    A bool indicating the dtype is compatible with `numeric`.
264    """
265    dtype_lower = dtype.lower()
266
267    acceptable_substrings = ('numeric', 'float', 'double', 'int')
268    for substring in acceptable_substrings:
269        if substring in dtype_lower:
270            return True
271
272    return False

Determine whether a given dtype string should be considered compatible with the Meerschaum dtype numeric.

Parameters
  • dtype (str): The pandas-like dtype string.
Returns
  • A bool indicating the dtype is compatible with numeric.
def attempt_cast_to_numeric( value: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
275def attempt_cast_to_numeric(
276    value: Any,
277    quantize: bool = False,
278    precision: Optional[int] = None,
279    scale: Optional[int] = None,
280)-> Any:
281    """
282    Given a value, attempt to coerce it into a numeric (Decimal).
283
284    Parameters
285    ----------
286    value: Any
287        The value to be cast to a Decimal.
288
289    quantize: bool, default False
290        If `True`, quantize the decimal to the specified precision and scale.
291
292    precision: Optional[int], default None
293        If `quantize` is `True`, use this precision.
294
295    scale: Optional[int], default None
296        If `quantize` is `True`, use this scale.
297
298    Returns
299    -------
300    A `Decimal` if possible, or `value`.
301    """
302    if isinstance(value, Decimal):
303        if quantize and precision and scale:
304            return quantize_decimal(value, precision, scale)
305        return value
306    try:
307        if value_is_null(value):
308            return Decimal('NaN')
309
310        dec = Decimal(str(value))
311        if not quantize or not precision or not scale:
312            return dec
313        return quantize_decimal(dec, precision, scale)
314    except Exception:
315        return value

Given a value, attempt to coerce it into a numeric (Decimal).

Parameters
  • value (Any): The value to be cast to a Decimal.
  • quantize (bool, default False): If True, quantize the decimal to the specified precision and scale.
  • precision (Optional[int], default None): If quantize is True, use this precision.
  • scale (Optional[int], default None): If quantize is True, use this scale.
Returns
  • A Decimal if possible, or value.
def attempt_cast_to_uuid(value: Any) -> Any:
318def attempt_cast_to_uuid(value: Any) -> Any:
319    """
320    Given a value, attempt to coerce it into a UUID (`uuid4`).
321    """
322    if isinstance(value, uuid.UUID):
323        return value
324    try:
325        return (
326            uuid.UUID(str(value))
327            if not value_is_null(value)
328            else None
329        )
330    except Exception:
331        return value

Given a value, attempt to coerce it into a UUID (uuid4).

def attempt_cast_to_bytes(value: Any) -> Any:
334def attempt_cast_to_bytes(value: Any) -> Any:
335    """
336    Given a value, attempt to coerce it into a bytestring.
337    """
338    if isinstance(value, bytes):
339        return value
340    try:
341        return (
342            deserialize_bytes_string(str(value))
343            if not value_is_null(value)
344            else None
345        )
346    except Exception:
347        return value

Given a value, attempt to coerce it into a bytestring.

def attempt_cast_to_geometry(value: Any) -> Any:
350def attempt_cast_to_geometry(value: Any) -> Any:
351    """
352    Given a value, attempt to coerce it into a `shapely` (`geometry`) object.
353    """
354    typ = str(type(value))
355    if 'pandas' in typ and 'Series' in typ:
356        if 'GeoSeries' in typ:
357            return value
358
359        gpd = mrsm.attempt_import('geopandas', lazy=False)
360        if len(value) == 0:
361            return gpd.GeoSeries([])
362
363        ix = value.first_valid_index()
364        if ix is None:
365            try:
366                return gpd.GeoSeries(value)
367            except Exception:
368                traceback.print_exc()
369                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
370
371        sample_val = value[ix]
372        sample_typ = str(type(sample_val))
373        if 'shapely' in sample_typ:
374            try:
375                return gpd.GeoSeries(value)
376            except Exception:
377                traceback.print_exc()
378                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
379
380        sample_is_gpkg = geometry_is_gpkg(sample_val)
381        if sample_is_gpkg:
382            try:
383                value = value.apply(lambda x: gpkg_wkb_to_wkb(x)[0])
384            except Exception:
385                traceback.print_exc()
386                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
387
388        sample_is_wkt = geometry_is_wkt(sample_val) if not sample_is_gpkg else False
389        try:
390            return (
391                gpd.GeoSeries.from_wkt(value)
392                if sample_is_wkt
393                else gpd.GeoSeries.from_wkb(value)
394            )
395        except Exception:
396            traceback.print_exc()
397            return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
398
399    if 'shapely' in typ:
400        return value
401
402    shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import(
403        'shapely',
404        'shapely.wkt',
405        'shapely.wkb',
406        lazy=False,
407    )
408
409    if isinstance(value, (dict, list)):
410        try:
411            return shapely.from_geojson(json.dumps(value))
412        except Exception:
413            return value
414
415    value_is_gpkg = geometry_is_gpkg(value)
416    if value_is_gpkg:
417        try:
418            wkb_data, _, _ = gpkg_wkb_to_wkb(value)
419            return shapely_wkb.loads(wkb_data)
420        except Exception:
421            return value
422
423    value_is_wkt = geometry_is_wkt(value)
424    if value_is_wkt is None:
425        return value
426
427    try:
428        return (
429            shapely_wkt.loads(value)
430            if value_is_wkt
431            else shapely_wkb.loads(value)
432        )
433    except Exception:
434        pass
435
436    return value

Given a value, attempt to coerce it into a shapely (geometry) object.

def geometry_is_wkt(value: Union[str, bytes]) -> Optional[bool]:
439def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]:
440    """
441    Determine whether an input value should be treated as WKT or WKB geometry data.
442
443    Parameters
444    ----------
445    value: Union[str, bytes]
446        The input data to be parsed into geometry data.
447
448    Returns
449    -------
450    A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB).
451    Return `None` if `value` should be parsed as neither.
452    """
453    import re
454    if not isinstance(value, (str, bytes)):
455        return None
456
457    if isinstance(value, bytes):
458        return False
459    
460    wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$'
461    if re.match(wkt_pattern, value, re.IGNORECASE):
462        return True
463    
464    if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0:
465        return False
466    
467    return None

Determine whether an input value should be treated as WKT or WKB geometry data.

Parameters
  • value (Union[str, bytes]): The input data to be parsed into geometry data.
Returns
  • A bool (True if value is WKT and False if it should be treated as WKB).
  • Return None if value should be parsed as neither.
def geometry_is_gpkg(value: bytes) -> bool:
470def geometry_is_gpkg(value: bytes) -> bool:
471    """
472    Return whether the input `value` is formatted as GeoPackage WKB.
473    """
474    if not isinstance(value, bytes) or len(value) < 2:
475        return False
476
477    return value[0:2] == b'GP'

Return whether the input value is formatted as GeoPackage WKB.

def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]:
479def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]:
480    """
481    Converts GeoPackage WKB to standard WKB by removing the header.
482    
483    Parameters
484    ----------
485    gpkg_wkb_bytes: bytes
486        The GeoPackage WKB byte string.
487        
488    Returns
489    -------
490    A tuple containing the standard WKB bytes, SRID, and flags.
491    """
492    magic_number = gpkg_wkb_bytes[0:2]
493    if magic_number != b'GP':
494        raise ValueError("Invalid GeoPackage WKB header: missing magic number.")
495    
496    try:
497        header = gpkg_wkb_bytes[0:8]
498        header_vals = struct.unpack('<ccBBi', header)
499        flags = header_vals[-2]
500        srid = header_vals[-1]
501    except struct.error:
502        header = gpkg_wkb_bytes[0:6]
503        header_vals = struct.unpack('<ccBBh', header)
504        flags = header_vals[-2]
505        srid = header_vals[-1]
506
507    envelope_type = (flags >> 1) & 0x07
508    envelope_sizes = {
509        0: 0,
510        1: 32,
511        2: 48,
512        3: 48,
513        4: 64,
514    }
515    header_length = 8 + envelope_sizes.get(envelope_type, 0)
516    standard_wkb_bytes = gpkg_wkb_bytes[header_length:]
517    return standard_wkb_bytes, srid, flags

Converts GeoPackage WKB to standard WKB by removing the header.

Parameters
  • gpkg_wkb_bytes (bytes): The GeoPackage WKB byte string.
Returns
  • A tuple containing the standard WKB bytes, SRID, and flags.
def value_is_null(value: Any) -> bool:
520def value_is_null(value: Any) -> bool:
521    """
522    Determine if a value is a null-like string.
523    """
524    return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')

Determine if a value is a null-like string.

def none_if_null(value: Any) -> Any:
527def none_if_null(value: Any) -> Any:
528    """
529    Return `None` if a value is a null-like string.
530    """
531    return (None if value_is_null(value) else value)

Return None if a value is a null-like string.

def quantize_decimal(x: decimal.Decimal, precision: int, scale: int) -> decimal.Decimal:
534def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
535    """
536    Quantize a given `Decimal` to a known scale and precision.
537
538    Parameters
539    ----------
540    x: Decimal
541        The `Decimal` to be quantized.
542
543    precision: int
544        The total number of significant digits.
545
546    scale: int
547        The number of significant digits after the decimal point.
548
549    Returns
550    -------
551    A `Decimal` quantized to the specified scale and precision.
552    """
553    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
554    try:
555        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
556    except InvalidOperation:
557        pass
558
559    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")

Quantize a given Decimal to a known scale and precision.

Parameters
  • x (Decimal): The Decimal to be quantized.
  • precision (int): The total number of significant digits.
  • scale (int): The number of significant digits after the decimal point.
Returns
  • A Decimal quantized to the specified scale and precision.
def serialize_decimal( x: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
562def serialize_decimal(
563    x: Any,
564    quantize: bool = False,
565    precision: Optional[int] = None,
566    scale: Optional[int] = None,
567) -> Any:
568    """
569    Return a quantized string of an input decimal.
570
571    Parameters
572    ----------
573    x: Any
574        The potential decimal to be serialized.
575
576    quantize: bool, default False
577        If `True`, quantize the incoming Decimal to the specified scale and precision
578        before serialization.
579
580    precision: Optional[int], default None
581        The precision of the decimal to be quantized.
582
583    scale: Optional[int], default None
584        The scale of the decimal to be quantized.
585
586    Returns
587    -------
588    A string of the input decimal or the input if not a Decimal.
589    """
590    if not isinstance(x, Decimal):
591        return x
592
593    if value_is_null(x):
594        return None
595
596    if quantize and scale and precision:
597        x = quantize_decimal(x, precision, scale)
598
599    return f"{x:f}"

Return a quantized string of an input decimal.

Parameters
  • x (Any): The potential decimal to be serialized.
  • quantize (bool, default False): If True, quantize the incoming Decimal to the specified scale and precision before serialization.
  • precision (Optional[int], default None): The precision of the decimal to be quantized.
  • scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
  • A string of the input decimal or the input if not a Decimal.
def coerce_timezone(dt: Any, strip_utc: bool = False) -> Any:
602def coerce_timezone(
603    dt: Any,
604    strip_utc: bool = False,
605) -> Any:
606    """
607    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
608    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
609    """
610    if dt is None:
611        return None
612
613    if isinstance(dt, int):
614        return dt
615
616    if isinstance(dt, str):
617        dateutil_parser = mrsm.attempt_import('dateutil.parser')
618        try:
619            dt = dateutil_parser.parse(dt)
620        except Exception:
621            return dt
622
623    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
624    if dt_is_series:
625        pandas = mrsm.attempt_import('pandas', lazy=False)
626
627        if (
628            pandas.api.types.is_datetime64_any_dtype(dt) and (
629                (dt.dt.tz is not None and not strip_utc)
630                or
631                (dt.dt.tz is None and strip_utc)
632            )
633        ):
634            return dt
635
636        dt_series = to_datetime(dt, coerce_utc=False)
637        if dt_series.dt.tz is None:
638            dt_series = dt_series.dt.tz_localize(timezone.utc)
639        if strip_utc:
640            try:
641                if dt_series.dt.tz is not None:
642                    dt_series = dt_series.dt.tz_localize(None)
643            except Exception:
644                pass
645
646        return dt_series
647
648    if dt.tzinfo is None:
649        if strip_utc:
650            return dt
651        return dt.replace(tzinfo=timezone.utc)
652
653    utc_dt = dt.astimezone(timezone.utc)
654    if strip_utc:
655        return utc_dt.replace(tzinfo=None)
656    return utc_dt

Given a datetime, pandas Timestamp or Series of Timestamp, return a UTC timestamp (strip timezone if strip_utc is True.

def to_datetime( dt_val: Any, as_pydatetime: bool = False, coerce_utc: bool = True, precision_unit: Optional[str] = None) -> Any:
659def to_datetime(
660    dt_val: Any,
661    as_pydatetime: bool = False,
662    coerce_utc: bool = True,
663    precision_unit: Optional[str] = None,
664) -> Any:
665    """
666    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
667
668    Parameters
669    ----------
670    dt_val: Any
671        The value to coerce to Pandas Timestamps.
672
673    as_pydatetime: bool, default False
674        If `True`, return a Python datetime object.
675
676    coerce_utc: bool, default True
677        If `True`, ensure the value has UTC tzinfo.
678
679    precision_unit: Optional[str], default None
680        If provided, enforce the provided precision unit.
681    """
682    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
683    is_dask = 'dask' in getattr(dt_val, '__module__', '')
684    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
685    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
686    pd = pandas if dd is None else dd
687    enforce_precision = precision_unit is not None
688    precision_unit = precision_unit or 'microsecond'
689    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
690    precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None)
691    if not precision_abbreviation:
692        raise ValueError(f"Invalid precision '{precision_unit}'.")
693
694    def parse(x: Any) -> Any:
695        try:
696            return dateutil_parser.parse(x)
697        except Exception:
698            return x
699
700    def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool:
701        dtype_check_against = (
702            f"datetime64[{precision_abbreviation}, UTC]"
703            if with_utc
704            else f"datetime64[{precision_abbreviation}]"
705        )
706        return (
707            dtype_to_check == dtype_check_against
708            if enforce_precision
709            else (
710                dtype_to_check.startswith('datetime64[')
711                and (
712                    ('utc' in dtype_to_check.lower())
713                    if with_utc
714                    else ('utc' not in dtype_to_check.lower())
715                )
716            )
717        )
718
719    if isinstance(dt_val, pd.Timestamp):
720        dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime()
721        return (
722            coerce_timezone(dt_val_to_return)
723            if coerce_utc
724            else dt_val_to_return
725        )
726
727    if dt_is_series:
728        changed_tz = False
729        original_tz = None
730        dtype = str(getattr(dt_val, 'dtype', 'object'))
731        if (
732            are_dtypes_equal(dtype, 'datetime')
733            and 'utc' not in dtype.lower()
734            and hasattr(dt_val, 'dt')
735        ):
736            original_tz = dt_val.dt.tz
737            dt_val = dt_val.dt.tz_localize(timezone.utc)
738            changed_tz = True
739            dtype = str(getattr(dt_val, 'dtype', 'object'))
740        try:
741            new_dt_series = (
742                dt_val
743                if check_dtype(dtype, with_utc=True)
744                else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]")
745            )
746        except pd.errors.OutOfBoundsDatetime:
747            try:
748                next_precision = get_next_precision_unit(true_precision_unit)
749                next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision]
750                new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]")
751            except Exception:
752                new_dt_series = None
753        except ValueError:
754            new_dt_series = None
755        except TypeError:
756            try:
757                new_dt_series = (
758                    new_dt_series
759                    if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False)
760                    else dt_val.astype(f"datetime64[{precision_abbreviation}]")
761                )
762            except Exception:
763                new_dt_series = None
764
765        if new_dt_series is None:
766            new_dt_series = dt_val.apply(lambda x: parse(str(x)))
767
768        if coerce_utc:
769            return coerce_timezone(new_dt_series)
770
771        if changed_tz:
772            new_dt_series = new_dt_series.dt.tz_localize(original_tz)
773        return new_dt_series
774
775    try:
776        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
777        if new_dt_val.unit != precision_abbreviation:
778            new_dt_val = new_dt_val.as_unit(precision_abbreviation)
779        if as_pydatetime:
780            return new_dt_val.to_pydatetime()
781        return new_dt_val
782    except (pd.errors.OutOfBoundsDatetime, ValueError):
783        pass
784
785    new_dt_val = parse(dt_val)
786    if not coerce_utc:
787        return new_dt_val
788    return coerce_timezone(new_dt_val)

Wrap pd.to_datetime() and add support for out-of-bounds values.

Parameters
  • dt_val (Any): The value to coerce to Pandas Timestamps.
  • as_pydatetime (bool, default False): If True, return a Python datetime object.
  • coerce_utc (bool, default True): If True, ensure the value has UTC tzinfo.
  • precision_unit (Optional[str], default None): If provided, enforce the provided precision unit.
def serialize_bytes(data: bytes) -> str:
791def serialize_bytes(data: bytes) -> str:
792    """
793    Return the given bytes as a base64-encoded string.
794    """
795    import base64
796    if not isinstance(data, bytes) and value_is_null(data):
797        return data
798    return base64.b64encode(data).decode('utf-8')

Return the given bytes as a base64-encoded string.

def serialize_geometry( geom: Any, geometry_format: str = 'wkb_hex', srid: Optional[int] = None) -> Union[str, Dict[str, Any], bytes, NoneType]:
801def serialize_geometry(
802    geom: Any,
803    geometry_format: str = 'wkb_hex',
804    srid: Optional[int] = None,
805) -> Union[str, Dict[str, Any], bytes, None]:
806    """
807    Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 
808
809    Parameters
810    ----------
811    geom: Any
812        The potential geometry data to be serialized.
813
814    geometry_format: str, default 'wkb_hex'
815        The serialization format for geometry data.
816        Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`.
817
818    srid: Optional[int], default None
819        If provided, use this as the source CRS when serializing to GeoJSON.
820
821    Returns
822    -------
823    A string containing the geometry data, or bytes, or a dictionary, or None.
824    """
825    if value_is_null(geom):
826        return None
827
828    shapely, shapely_ops, pyproj, np = mrsm.attempt_import(
829        'shapely', 'shapely.ops', 'pyproj', 'numpy',
830        lazy=False,
831    )
832    if geometry_format == 'geojson':
833        if srid:
834            transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True)
835            geom = shapely_ops.transform(transformer.transform, geom)
836        geojson_str = shapely.to_geojson(geom)
837        return json.loads(geojson_str)
838
839    if not hasattr(geom, 'wkb_hex'):
840        return str(geom)
841
842    byte_order = 1 if np.little_endian else 0
843
844    if geometry_format.startswith("wkb"):
845        return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True)
846    elif geometry_format == 'gpkg_wkb':
847        wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order)
848        flags = (
849            ((byte_order & 0x01) | (0x20))
850            if geom.is_empty
851            else (byte_order & 0x01)
852        )
853        srid_val = srid or -1
854        header = struct.pack(
855            '<ccBBi',
856            b'G', b'P',
857            0,
858            flags,
859            srid_val
860        )
861        return header + wkb_data
862
863    return shapely.to_wkt(geom)

Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON.

Parameters
  • geom (Any): The potential geometry data to be serialized.
  • geometry_format (str, default 'wkb_hex'): The serialization format for geometry data. Accepted formats are wkb, wkb_hex, wkt, geojson, and gpkg_wkb.
  • srid (Optional[int], default None): If provided, use this as the source CRS when serializing to GeoJSON.
Returns
  • A string containing the geometry data, or bytes, or a dictionary, or None.
def deserialize_geometry(geom_wkb: Union[str, bytes]):
866def deserialize_geometry(geom_wkb: Union[str, bytes]):
867    """
868    Deserialize a WKB string into a shapely geometry object.
869    """
870    shapely = mrsm.attempt_import('shapely', lazy=False)
871    return shapely.wkb.loads(geom_wkb)

Deserialize a WKB string into a shapely geometry object.

def project_geometry(geom, srid: int, to_srid: int = 4326):
874def project_geometry(geom, srid: int, to_srid: int = 4326):
875    """
876    Project a shapely geometry object to a new CRS (SRID).
877    """
878    pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False)
879    transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True)
880    return shapely_ops.transform(transformer.transform, geom)

Project a shapely geometry object to a new CRS (SRID).

def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Optional[bytes]:
883def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]:
884    """
885    Given a serialized ASCII string of bytes data, return the original bytes.
886    The input data may either be base64- or hex-encoded.
887
888    Parameters
889    ----------
890    data: Optional[str]
891        The string to be deserialized into bytes.
892        May be base64- or hex-encoded (prefixed with `'\\x'`).
893
894    force_hex: bool = False
895        If `True`, treat the input string as hex-encoded.
896        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
897        This will still strip the leading `'\\x'` prefix if present.
898
899    Returns
900    -------
901    The original bytes used to produce the encoded string `data`.
902    """
903    if not isinstance(data, str) and value_is_null(data):
904        return data
905
906    import binascii
907    import base64
908
909    is_hex = force_hex or data.startswith('\\x')
910
911    if is_hex:
912        if data.startswith('\\x'):
913            data = data[2:]
914        return binascii.unhexlify(data)
915
916    return base64.b64decode(data)

Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.

Parameters
  • data (Optional[str]): The string to be deserialized into bytes. May be base64- or hex-encoded (prefixed with '\x').
  • force_hex (bool = False): If True, treat the input string as hex-encoded. If data does not begin with the prefix '\x', set force_hex to True. This will still strip the leading '\x' prefix if present.
Returns
  • The original bytes used to produce the encoded string data.
def deserialize_base64(data: str) -> bytes:
919def deserialize_base64(data: str) -> bytes:
920    """
921    Return the original bytestring from the given base64-encoded string.
922    """
923    import base64
924    return base64.b64decode(data)

Return the original bytestring from the given base64-encoded string.

def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Optional[str]:
927def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]:
928    """
929    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
930    """
931    import binascii
932    if not isinstance(data, bytes) and value_is_null(data):
933        return data
934    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')

Return the given bytes as a hex string for PostgreSQL's BYTEA type.

def serialize_datetime(dt: datetime.datetime) -> Optional[str]:
937def serialize_datetime(dt: datetime) -> Union[str, None]:
938    """
939    Serialize a datetime object into JSON (ISO format string).
940
941    Examples
942    --------
943    >>> import json
944    >>> from datetime import datetime
945    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
946    '{"a": "2022-01-01T00:00:00Z"}'
947
948    """
949    if not hasattr(dt, 'isoformat'):
950        return None
951
952    tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else ''
953    return dt.isoformat() + tz_suffix

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
def serialize_date(d: datetime.date) -> Optional[str]:
956def serialize_date(d: date) -> Union[str, None]:
957    """
958    Serialize a date object into its ISO representation.
959    """
960    return d.isoformat() if hasattr(d, 'isoformat') else None

Serialize a date object into its ISO representation.

def json_serialize_value(x: Any, default_to_str: bool = True) -> Optional[str]:
 963def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]:
 964    """
 965    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
 966
 967    Parameters
 968    ----------
 969    x: Any
 970        The value to serialize.
 971
 972    default_to_str: bool, default True
 973        If `True`, return a string of `x` if x is not a designated type.
 974        Otherwise return x.
 975
 976    Returns
 977    -------
 978    A serialized version of x, or x.
 979    """
 980    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
 981        return x.meta
 982
 983    if hasattr(x, 'tzinfo'):
 984        return serialize_datetime(x)
 985
 986    if hasattr(x, 'isoformat'):
 987        return serialize_date(x)
 988
 989    if isinstance(x, bytes):
 990        return serialize_bytes(x)
 991
 992    if isinstance(x, Decimal):
 993        return serialize_decimal(x)
 994
 995    if 'shapely' in str(type(x)):
 996        return serialize_geometry(x)
 997
 998    if value_is_null(x):
 999        return None
1000
1001    if isinstance(x, (dict, list, tuple)):
1002        return json.dumps(x, default=json_serialize_value, separators=(',', ':'))
1003
1004    return str(x) if default_to_str else x

Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.

Parameters
  • x (Any): The value to serialize.
  • default_to_str (bool, default True): If True, return a string of x if x is not a designated type. Otherwise return x.
Returns
  • A serialized version of x, or x.
def get_geometry_type_srid( dtype: str = 'geometry', default_type: str = 'geometry', default_srid: Union[int, str] = 0) -> Tuple[str, Union[int, str, NoneType]]:
1007def get_geometry_type_srid(
1008    dtype: str = 'geometry',
1009    default_type: str = 'geometry',
1010    default_srid: Union[int, str] = 0,
1011) -> Tuple[str, Union[int, str, None]]:
1012    """
1013    Given the specified geometry `dtype`, return a tuple in the form (type, SRID).
1014
1015    Parameters
1016    ----------
1017    dtype: Optional[str], default None
1018        Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`).
1019        You may specify a supported `shapely` geometry type and an SRID in the dtype modifier:
1020
1021        - `Point`
1022        - `LineString`
1023        - `LinearRing`
1024        - `Polygon`
1025        - `MultiPoint`
1026        - `MultiLineString`
1027        - `MultiPolygon`
1028        - `GeometryCollection`
1029
1030    Returns
1031    -------
1032    A tuple in the form (type, SRID).
1033    Defaults to `(default_type, default_srid)`.
1034
1035    Examples
1036    --------
1037    >>> from meerschaum.utils.dtypes import get_geometry_type_srid
1038    >>> get_geometry_type_srid()
1039    ('geometry', 4326)
1040    >>> get_geometry_type_srid('geometry[]')
1041    ('geometry', 4326)
1042    >>> get_geometry_type_srid('geometry[Point, 0]')
1043    ('Point', 0)
1044    >>> get_geometry_type_srid('geometry[0, Point]')
1045    ('Point', 0)
1046    >>> get_geometry_type_srid('geometry[0]')
1047    ('geometry', 0)
1048    >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
1049    ('MultiLineString', 4326)
1050    >>> get_geometry_type_srid('geography')
1051    ('geometry', 0)
1052    >>> get_geometry_type_srid('geography[POINT]')
1053    ('Point', 0)
1054    >>> get_geometry_type_srid('geometry[POINT, ESRI:102003]')
1055    ('Point', 'ESRI:102003')
1056    """
1057    from meerschaum.utils.misc import is_int
1058    ### NOTE: PostGIS syntax must also be parsed.
1059    dtype = dtype.replace('(', '[').replace(')', ']')
1060    bare_dtype = dtype.split('[', maxsplit=1)[0]
1061    modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']')
1062    if not modifier:
1063        return default_type, default_srid
1064
1065    parts = [
1066        part.split('=')[-1].strip()
1067        for part in modifier.split(',')
1068    ]
1069    parts_casted = [
1070        (
1071            int(part)
1072            if is_int(part)
1073            else part
1074        )
1075        for part in parts
1076    ]
1077
1078    srid = default_srid
1079    geometry_type = default_type
1080
1081    for part in parts_casted:
1082        if isinstance(part, int) or ':' in str(part):
1083            srid = part
1084            break
1085
1086    for part in parts_casted:
1087        if isinstance(part, str) and part != srid:
1088            geometry_type = part
1089            break
1090
1091    return geometry_type, srid

Given the specified geometry dtype, return a tuple in the form (type, SRID).

Parameters
  • dtype (Optional[str], default None): Optionally provide a specific geometry syntax (e.g. geometry[MultiLineString, 4326]). You may specify a supported shapely geometry type and an SRID in the dtype modifier:

    • Point
    • LineString
    • LinearRing
    • Polygon
    • MultiPoint
    • MultiLineString
    • MultiPolygon
    • GeometryCollection
Returns
  • A tuple in the form (type, SRID).
  • Defaults to (default_type, default_srid).
Examples
>>> from meerschaum.utils.dtypes import get_geometry_type_srid
>>> get_geometry_type_srid()
('geometry', 4326)
>>> get_geometry_type_srid('geometry[]')
('geometry', 4326)
>>> get_geometry_type_srid('geometry[Point, 0]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0, Point]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0]')
('geometry', 0)
>>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
('MultiLineString', 4326)
>>> get_geometry_type_srid('geography')
('geometry', 0)
>>> get_geometry_type_srid('geography[POINT]')
('Point', 0)
>>> get_geometry_type_srid('geometry[POINT, ESRI:102003]')
('Point', 'ESRI:102003')
def get_current_timestamp( precision_unit: str = 'microsecond', precision_interval: int = 1, round_to: str = 'down', as_pandas: bool = False, as_int: bool = False, _now: Union[datetime.datetime, int, NoneType] = None) -> 'Union[datetime, pd.Timestamp, int]':
1094def get_current_timestamp(
1095    precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'],
1096    precision_interval: int = 1,
1097    round_to: str = 'down',
1098    as_pandas: bool = False,
1099    as_int: bool = False,
1100    _now: Union[datetime, int, None] = None,
1101) -> 'Union[datetime, pd.Timestamp, int]':
1102    """
1103    Return the current UTC timestamp to nanosecond precision.
1104
1105    Parameters
1106    ----------
1107    precision_unit: str, default 'us'
1108        The precision of the timestamp to be returned.
1109        Valid values are the following:
1110            - `ns` / `nanosecond`
1111            - `us` / `microsecond`
1112            - `ms` / `millisecond`
1113            - `s` / `sec` / `second`
1114            - `m` / `min` / `minute`
1115            - `h` / `hr` / `hour`
1116            - `d` / `day`
1117
1118    precision_interval: int, default 1
1119        Round the timestamp to the `precision_interval` units.
1120        For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals.
1121        Note: `precision_interval` must be 1 when `precision='nanosecond'`.
1122
1123    round_to: str, default 'down'
1124        The direction to which to round the timestamp.
1125        Available options are `down`, `up`, and `closest`.
1126
1127    as_pandas: bool, default False
1128        If `True`, return a Pandas Timestamp.
1129        This is always true if `unit` is `nanosecond`.
1130
1131    as_int: bool, default False
1132        If `True`, return the timestamp to an integer.
1133        Overrides `as_pandas`.
1134
1135    Returns
1136    -------
1137    A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
1138
1139    Examples
1140    --------
1141    >>> get_current_timestamp('ns')
1142    Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
1143    >>> get_current_timestamp('ms')
1144    Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1145    """
1146    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1147    if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS:
1148        from meerschaum.utils.misc import items_str
1149        raise ValueError(
1150            f"Unknown precision unit '{precision_unit}'. "
1151            "Accepted values are "
1152            f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}."
1153        )
1154
1155    if not as_int:
1156        as_pandas = as_pandas or true_precision_unit == 'nanosecond'
1157    pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None
1158
1159    if true_precision_unit == 'nanosecond':
1160        if precision_interval != 1:
1161            warn("`precision_interval` must be 1 for nanosecond precision.")
1162        now_ts = time.time_ns() if not isinstance(_now, int) else _now
1163        if as_int:
1164            return now_ts
1165        return pd.to_datetime(now_ts, unit='ns', utc=True)
1166
1167    now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now
1168    delta = timedelta(**{true_precision_unit + 's': precision_interval})
1169    rounded_now = round_time(now, delta, to=round_to)
1170
1171    if as_int:
1172        return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit])
1173
1174    ts_val = (
1175        pd.to_datetime(rounded_now, utc=True)
1176        if as_pandas
1177        else rounded_now
1178    )
1179
1180    if not as_pandas:
1181        return ts_val
1182
1183    as_unit_precisions = ('microsecond', 'millisecond', 'second')
1184    if true_precision_unit not in as_unit_precisions:
1185        return ts_val
1186
1187    return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])

Return the current UTC timestamp to nanosecond precision.

Parameters
  • precision_unit (str, default 'us'): The precision of the timestamp to be returned. Valid values are the following: - ns / nanosecond - us / microsecond - ms / millisecond - s / sec / second - m / min / minute - h / hr / hour - d / day
  • precision_interval (int, default 1): Round the timestamp to the precision_interval units. For example, precision='minute' and precision_interval=15 will round to 15-minute intervals. Note: precision_interval must be 1 when precision='nanosecond'.
  • round_to (str, default 'down'): The direction to which to round the timestamp. Available options are down, up, and closest.
  • as_pandas (bool, default False): If True, return a Pandas Timestamp. This is always true if unit is nanosecond.
  • as_int (bool, default False): If True, return the timestamp to an integer. Overrides as_pandas.
Returns
  • A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
Examples
>>> get_current_timestamp('ns')
Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
>>> get_current_timestamp('ms')
Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
def is_dtype_special(type_: str) -> bool:
1190def is_dtype_special(type_: str) -> bool:
1191    """
1192    Return whether a dtype should be treated as a special Meerschaum dtype.
1193    This is not the same as a Meerschaum alias.
1194    """
1195    true_type = MRSM_ALIAS_DTYPES.get(type_, type_)
1196    if true_type in (
1197        'uuid',
1198        'json',
1199        'bytes',
1200        'numeric',
1201        'datetime',
1202        'geometry',
1203        'geography',
1204        'date',
1205        'bool',
1206    ):
1207        return True
1208
1209    if are_dtypes_equal(true_type, 'datetime'):
1210        return True
1211
1212    if are_dtypes_equal(true_type, 'date'):
1213        return True
1214
1215    if true_type.startswith('numeric'):
1216        return True
1217
1218    if true_type.startswith('bool'):
1219        return True
1220
1221    if true_type.startswith('geometry'):
1222        return True
1223
1224    if true_type.startswith('geography'):
1225        return True
1226
1227    return False

Return whether a dtype should be treated as a special Meerschaum dtype. This is not the same as a Meerschaum alias.

def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1230def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1231    """
1232    Get the next precision string in order of value.
1233
1234    Parameters
1235    ----------
1236    precision_unit: str
1237        The precision string (`'nanosecond'`, `'ms'`, etc.).
1238
1239    decrease: bool, defaul True
1240        If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`).
1241        If `False`, return the precision unit which is higher.
1242
1243    Returns
1244    -------
1245    A `precision` string which is lower or higher than the given precision unit.
1246
1247    Examples
1248    --------
1249    >>> get_next_precision_unit('nanosecond')
1250    'microsecond'
1251    >>> get_next_precision_unit('ms')
1252    'second'
1253    >>> get_next_precision_unit('hour', decrease=False)
1254    'minute'
1255    """
1256    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1257    precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None)
1258    if not precision_scalar:
1259        raise ValueError(f"Invalid precision unit '{precision_unit}'.")
1260
1261    precisions = sorted(
1262        list(MRSM_PRECISION_UNITS_SCALARS),
1263        key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p]
1264    )
1265
1266    precision_index = precisions.index(true_precision_unit)
1267    new_precision_index = precision_index + (-1 if decrease else 1)
1268    if new_precision_index < 0 or new_precision_index >= len(precisions):
1269        raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.")
1270
1271    return precisions[new_precision_index]

Get the next precision string in order of value.

Parameters
  • precision_unit (str): The precision string ('nanosecond', 'ms', etc.).
  • decrease (bool, defaul True): If True return the precision unit which is lower (e.g. nanosecond -> millisecond). If False, return the precision unit which is higher.
Returns
  • A precision string which is lower or higher than the given precision unit.
Examples
>>> get_next_precision_unit('nanosecond')
'microsecond'
>>> get_next_precision_unit('ms')
'second'
>>> get_next_precision_unit('hour', decrease=False)
'minute'
def round_time( dt: Optional[datetime.datetime] = None, date_delta: Optional[datetime.timedelta] = None, to: str = 'down') -> datetime.datetime:
1274def round_time(
1275    dt: Optional[datetime] = None,
1276    date_delta: Optional[timedelta] = None,
1277    to: 'str' = 'down'
1278) -> datetime:
1279    """
1280    Round a datetime object to a multiple of a timedelta.
1281
1282    Parameters
1283    ----------
1284    dt: Optional[datetime], default None
1285        If `None`, grab the current UTC datetime.
1286
1287    date_delta: Optional[timedelta], default None
1288        If `None`, use a delta of 1 minute.
1289
1290    to: 'str', default 'down'
1291        Available options are `'up'`, `'down'`, and `'closest'`.
1292
1293    Returns
1294    -------
1295    A rounded `datetime` object.
1296
1297    Examples
1298    --------
1299    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
1300    datetime.datetime(2022, 1, 1, 12, 15)
1301    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
1302    datetime.datetime(2022, 1, 1, 12, 16)
1303    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
1304    datetime.datetime(2022, 1, 1, 12, 0)
1305    >>> round_time(
1306    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
1307    ...   timedelta(hours=1),
1308    ...   to = 'closest'
1309    ... )
1310    datetime.datetime(2022, 1, 1, 12, 0)
1311    >>> round_time(
1312    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
1313    ...   datetime.timedelta(hours=1),
1314    ...   to = 'closest'
1315    ... )
1316    datetime.datetime(2022, 1, 1, 13, 0)
1317
1318    """
1319    from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP
1320    if date_delta is None:
1321        date_delta = timedelta(minutes=1)
1322
1323    if dt is None:
1324        dt = datetime.now(timezone.utc).replace(tzinfo=None)
1325
1326    def get_total_microseconds(td: timedelta) -> int:
1327        return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds
1328
1329    round_to_microseconds = get_total_microseconds(date_delta)
1330    if round_to_microseconds == 0:
1331        return dt
1332
1333    dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min
1334    dt_total_microseconds = get_total_microseconds(dt_delta_from_min)
1335
1336    dt_dec = Decimal(dt_total_microseconds)
1337    round_to_dec = Decimal(round_to_microseconds)
1338
1339    div = dt_dec / round_to_dec
1340    if to == 'down':
1341        num_intervals = div.to_integral_value(rounding=ROUND_DOWN)
1342    elif to == 'up':
1343        num_intervals = div.to_integral_value(rounding=ROUND_UP)
1344    else:
1345        num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP)
1346
1347    rounded_dt_total_microseconds = num_intervals * round_to_dec
1348    adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds
1349
1350    return dt + timedelta(microseconds=adjustment_microseconds)

Round a datetime object to a multiple of a timedelta.

Parameters
  • dt (Optional[datetime], default None): If None, grab the current UTC datetime.
  • date_delta (Optional[timedelta], default None): If None, use a delta of 1 minute.
  • to ('str', default 'down'): Available options are 'up', 'down', and 'closest'.
Returns
  • A rounded datetime object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 15, 57, 200),
...   timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 45, 57, 200),
...   datetime.timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)