meerschaum.utils.dtypes

Utility functions for working with data types.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with data types.
   7"""
   8
   9import traceback
  10import json
  11import uuid
  12import time
  13import struct
  14from datetime import timezone, datetime, date, timedelta
  15from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP
  16
  17import meerschaum as mrsm
  18from meerschaum.utils.typing import Dict, Union, Any, Optional, Tuple
  19from meerschaum.utils.warnings import warn
  20from meerschaum._internal.static import STATIC_CONFIG as _STATIC_CONFIG
  21
  22MRSM_ALIAS_DTYPES: Dict[str, str] = {
  23    'decimal': 'numeric',
  24    'Decimal': 'numeric',
  25    'number': 'numeric',
  26    'jsonl': 'json',
  27    'JSON': 'json',
  28    'binary': 'bytes',
  29    'blob': 'bytes',
  30    'varbinary': 'bytes',
  31    'bytea': 'bytes',
  32    'guid': 'uuid',
  33    'UUID': 'uuid',
  34    'geom': 'geometry',
  35    'geog': 'geography',
  36    'boolean': 'bool',
  37    'day': 'date',
  38}
  39MRSM_PD_DTYPES: Dict[Union[str, None], str] = {
  40    'json': 'object',
  41    'numeric': 'object',
  42    'geometry': 'object',
  43    'geography': 'object',
  44    'uuid': 'object',
  45    'date': 'date32[day][pyarrow]',
  46    'datetime': 'datetime64[us, UTC]',
  47    'bool': 'bool[pyarrow]',
  48    'int': 'int64[pyarrow]',
  49    'int8': 'int8[pyarrow]',
  50    'int16': 'int16[pyarrow]',
  51    'int32': 'int32[pyarrow]',
  52    'int64': 'int64[pyarrow]',
  53    'str': 'string',
  54    'bytes': 'binary[pyarrow]',
  55    None: 'object',
  56}
  57
  58MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = {
  59    'nanosecond': 1_000_000_000,
  60    'microsecond': 1_000_000,
  61    'millisecond': 1000,
  62    'second': 1,
  63    'minute': (1 / 60),
  64    'hour': (1 / 3600),
  65    'day': (1 / 86400),
  66}
  67
  68MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = {
  69    'ns': 'nanosecond',
  70    'us': 'microsecond',
  71    'ms': 'millisecond',
  72    's': 'second',
  73    'sec': 'second',
  74    'm': 'minute',
  75    'min': 'minute',
  76    'h': 'hour',
  77    'hr': 'hour',
  78    'd': 'day',
  79    'D': 'day',
  80}
  81MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = {
  82    'nanosecond': 'ns',
  83    'microsecond': 'us',
  84    'millisecond': 'ms',
  85    'second': 's',
  86    'minute': 'min',
  87    'hour': 'hr',
  88    'day': 'D',
  89}
  90
  91
  92def to_pandas_dtype(dtype: str) -> str:
  93    """
  94    Cast a supported Meerschaum dtype to a Pandas dtype.
  95    """
  96    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
  97    if known_dtype is not None:
  98        return known_dtype
  99
 100    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
 101    if alias_dtype is not None:
 102        return MRSM_PD_DTYPES[alias_dtype]
 103
 104    if dtype.startswith('numeric'):
 105        return MRSM_PD_DTYPES['numeric']
 106
 107    if dtype.startswith('geometry'):
 108        return MRSM_PD_DTYPES['geometry']
 109
 110    if dtype.startswith('geography'):
 111        return MRSM_PD_DTYPES['geography']
 112
 113    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
 114    ### treat it as a SQL db type.
 115    if dtype.split(' ')[0].isupper():
 116        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
 117        return get_pd_type_from_db_type(dtype)
 118
 119    from meerschaum.utils.packages import attempt_import
 120    _ = attempt_import('pyarrow', lazy=False)
 121    pandas = attempt_import('pandas', lazy=False)
 122
 123    try:
 124        return str(pandas.api.types.pandas_dtype(dtype))
 125    except Exception:
 126        warn(
 127            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
 128            + f"{traceback.format_exc()}",
 129            stack=False,
 130        )
 131    return 'object'
 132
 133
 134def are_dtypes_equal(
 135    ldtype: Union[str, Dict[str, str]],
 136    rdtype: Union[str, Dict[str, str]],
 137) -> bool:
 138    """
 139    Determine whether two dtype strings may be considered
 140    equivalent to avoid unnecessary conversions.
 141
 142    Parameters
 143    ----------
 144    ldtype: Union[str, Dict[str, str]]
 145        The left dtype to compare.
 146        May also provide a dtypes dictionary.
 147
 148    rdtype: Union[str, Dict[str, str]]
 149        The right dtype to compare.
 150        May also provide a dtypes dictionary.
 151
 152    Returns
 153    -------
 154    A `bool` indicating whether the two dtypes are to be considered equivalent.
 155    """
 156    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
 157        lkeys = sorted([str(k) for k in ldtype.keys()])
 158        rkeys = sorted([str(k) for k in rdtype.keys()])
 159        for lkey, rkey in zip(lkeys, rkeys):
 160            if lkey != rkey:
 161                return False
 162            ltype = ldtype[lkey]
 163            rtype = rdtype[rkey]
 164            if not are_dtypes_equal(ltype, rtype):
 165                return False
 166        return True
 167
 168    try:
 169        if ldtype == rdtype:
 170            return True
 171    except Exception:
 172        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
 173        return False
 174
 175    ### Sometimes pandas dtype objects are passed.
 176    ldtype = str(ldtype).split('[', maxsplit=1)[0]
 177    rdtype = str(rdtype).split('[', maxsplit=1)[0]
 178
 179    if ldtype in MRSM_ALIAS_DTYPES:
 180        ldtype = MRSM_ALIAS_DTYPES[ldtype]
 181
 182    if rdtype in MRSM_ALIAS_DTYPES:
 183        rdtype = MRSM_ALIAS_DTYPES[rdtype]
 184
 185    json_dtypes = ('json', 'object')
 186    if ldtype in json_dtypes and rdtype in json_dtypes:
 187        return True
 188
 189    numeric_dtypes = ('numeric', 'decimal', 'object')
 190    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
 191        return True
 192
 193    uuid_dtypes = ('uuid', 'object')
 194    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
 195        return True
 196
 197    bytes_dtypes = ('bytes', 'object', 'binary')
 198    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
 199        return True
 200
 201    geometry_dtypes = ('geometry', 'object', 'geography')
 202    if ldtype in geometry_dtypes and rdtype in geometry_dtypes:
 203        return True
 204
 205    if ldtype.lower() == rdtype.lower():
 206        return True
 207
 208    datetime_dtypes = ('datetime', 'timestamp')
 209    ldtype_found_dt_prefix = False
 210    rdtype_found_dt_prefix = False
 211    for dt_prefix in datetime_dtypes:
 212        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
 213        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
 214    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
 215        return True
 216
 217    string_dtypes = ('str', 'string', 'object')
 218    if ldtype in string_dtypes and rdtype in string_dtypes:
 219        return True
 220
 221    int_dtypes = (
 222        'int', 'int64', 'int32', 'int16', 'int8',
 223        'uint', 'uint64', 'uint32', 'uint16', 'uint8',
 224    )
 225    int_substrings = ('int',)
 226    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
 227        return True
 228    for substring in int_substrings:
 229        if substring in ldtype.lower() and substring in rdtype.lower():
 230            return True
 231
 232    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
 233    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
 234        return True
 235
 236    bool_dtypes = ('bool', 'boolean')
 237    if ldtype in bool_dtypes and rdtype in bool_dtypes:
 238        return True
 239
 240    date_dtypes = (
 241        'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]',
 242        'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]',
 243    )
 244    if ldtype in date_dtypes and rdtype in date_dtypes:
 245        return True
 246
 247    return False
 248
 249
 250def is_dtype_numeric(dtype: str) -> bool:
 251    """
 252    Determine whether a given `dtype` string
 253    should be considered compatible with the Meerschaum dtype `numeric`.
 254
 255    Parameters
 256    ----------
 257    dtype: str
 258        The pandas-like dtype string.
 259
 260    Returns
 261    -------
 262    A bool indicating the dtype is compatible with `numeric`.
 263    """
 264    dtype_lower = dtype.lower()
 265
 266    acceptable_substrings = ('numeric', 'float', 'double', 'int')
 267    for substring in acceptable_substrings:
 268        if substring in dtype_lower:
 269            return True
 270
 271    return False
 272
 273
 274def attempt_cast_to_numeric(
 275    value: Any,
 276    quantize: bool = False,
 277    precision: Optional[int] = None,
 278    scale: Optional[int] = None,
 279)-> Any:
 280    """
 281    Given a value, attempt to coerce it into a numeric (Decimal).
 282
 283    Parameters
 284    ----------
 285    value: Any
 286        The value to be cast to a Decimal.
 287
 288    quantize: bool, default False
 289        If `True`, quantize the decimal to the specified precision and scale.
 290
 291    precision: Optional[int], default None
 292        If `quantize` is `True`, use this precision.
 293
 294    scale: Optional[int], default None
 295        If `quantize` is `True`, use this scale.
 296
 297    Returns
 298    -------
 299    A `Decimal` if possible, or `value`.
 300    """
 301    if isinstance(value, Decimal):
 302        if quantize and precision and scale:
 303            return quantize_decimal(value, precision, scale)
 304        return value
 305    try:
 306        if value_is_null(value):
 307            return Decimal('NaN')
 308
 309        dec = Decimal(str(value))
 310        if not quantize or not precision or not scale:
 311            return dec
 312        return quantize_decimal(dec, precision, scale)
 313    except Exception:
 314        return value
 315
 316
 317def attempt_cast_to_uuid(value: Any) -> Any:
 318    """
 319    Given a value, attempt to coerce it into a UUID (`uuid4`).
 320    """
 321    if isinstance(value, uuid.UUID):
 322        return value
 323    try:
 324        return (
 325            uuid.UUID(str(value))
 326            if not value_is_null(value)
 327            else None
 328        )
 329    except Exception:
 330        return value
 331
 332
 333def attempt_cast_to_bytes(value: Any) -> Any:
 334    """
 335    Given a value, attempt to coerce it into a bytestring.
 336    """
 337    if isinstance(value, bytes):
 338        return value
 339    try:
 340        return (
 341            deserialize_bytes_string(str(value))
 342            if not value_is_null(value)
 343            else None
 344        )
 345    except Exception:
 346        return value
 347
 348
 349def attempt_cast_to_geometry(value: Any) -> Any:
 350    """
 351    Given a value, attempt to coerce it into a `shapely` (`geometry`) object.
 352    """
 353    typ = str(type(value))
 354    if 'pandas' in typ and 'Series' in typ:
 355        if 'GeoSeries' in typ:
 356            return value
 357        gpd = mrsm.attempt_import('geopandas')
 358        if len(value) == 0:
 359            return gpd.GeoSeries([])
 360
 361        ix = value.first_valid_index()
 362        if ix is None:
 363            try:
 364                return gpd.GeoSeries(value)
 365            except Exception:
 366                traceback.print_exc()
 367                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
 368        sample_val = value[ix]
 369
 370        sample_is_gpkg = geometry_is_gpkg(sample_val)
 371        if sample_is_gpkg:
 372            try:
 373                value = value.apply(lambda x: gpkg_wkb_to_wkb(x)[0])
 374            except Exception:
 375                traceback.print_exc()
 376                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
 377
 378        sample_is_wkt = geometry_is_wkt(sample_val) if not sample_is_gpkg else False
 379        try:
 380            return (
 381                gpd.GeoSeries.from_wkt(value)
 382                if sample_is_wkt
 383                else gpd.GeoSeries.from_wkb(value)
 384            )
 385        except Exception:
 386            traceback.print_exc()
 387            return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
 388
 389    if 'shapely' in typ:
 390        return value
 391
 392    shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import(
 393        'shapely',
 394        'shapely.wkt',
 395        'shapely.wkb',
 396        lazy=False,
 397    )
 398
 399    if isinstance(value, (dict, list)):
 400        try:
 401            return shapely.from_geojson(json.dumps(value))
 402        except Exception:
 403            return value
 404
 405    value_is_gpkg = geometry_is_gpkg(value)
 406    if value_is_gpkg:
 407        try:
 408            wkb_data, _, _ = gpkg_wkb_to_wkb(value)
 409            return shapely_wkb.loads(wkb_data)
 410        except Exception:
 411            return value
 412
 413    value_is_wkt = geometry_is_wkt(value)
 414    if value_is_wkt is None:
 415        return value
 416
 417    try:
 418        return (
 419            shapely_wkt.loads(value)
 420            if value_is_wkt
 421            else shapely_wkb.loads(value)
 422        )
 423    except Exception:
 424        pass
 425
 426    return value
 427
 428
 429def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]:
 430    """
 431    Determine whether an input value should be treated as WKT or WKB geometry data.
 432
 433    Parameters
 434    ----------
 435    value: Union[str, bytes]
 436        The input data to be parsed into geometry data.
 437
 438    Returns
 439    -------
 440    A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB).
 441    Return `None` if `value` should be parsed as neither.
 442    """
 443    import re
 444    if not isinstance(value, (str, bytes)):
 445        return None
 446
 447    if isinstance(value, bytes):
 448        return False
 449    
 450    wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$'
 451    if re.match(wkt_pattern, value, re.IGNORECASE):
 452        return True
 453    
 454    if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0:
 455        return False
 456    
 457    return None
 458
 459
 460def geometry_is_gpkg(value: bytes) -> bool:
 461    """
 462    Return whether the input `value` is formatted as GeoPackage WKB.
 463    """
 464    if not isinstance(value, bytes) or len(value) < 2:
 465        return False
 466
 467    return value[0:2] == b'GP'
 468
 469def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]:
 470    """
 471    Converts GeoPackage WKB to standard WKB by removing the header.
 472    
 473    Parameters
 474    ----------
 475    gpkg_wkb_bytes: bytes
 476        The GeoPackage WKB byte string.
 477        
 478    Returns
 479    -------
 480    A tuple containing the standard WKB bytes, SRID, and flags.
 481    """
 482    magic_number = gpkg_wkb_bytes[0:2]
 483    if magic_number != b'GP':
 484        raise ValueError("Invalid GeoPackage WKB header: missing magic number.")
 485    
 486    try:
 487        header = gpkg_wkb_bytes[0:8]
 488        header_vals = struct.unpack('<ccBBi', header)
 489        flags = header_vals[-2]
 490        srid = header_vals[-1]
 491    except struct.error:
 492        header = gpkg_wkb_bytes[0:6]
 493        header_vals = struct.unpack('<ccBBh', header)
 494        flags = header_vals[-2]
 495        srid = header_vals[-1]
 496
 497    envelope_type = (flags >> 1) & 0x07
 498    envelope_sizes = {
 499        0: 0,
 500        1: 32,
 501        2: 48,
 502        3: 48,
 503        4: 64,
 504    }
 505    header_length = 8 + envelope_sizes.get(envelope_type, 0)
 506    standard_wkb_bytes = gpkg_wkb_bytes[header_length:]
 507    return standard_wkb_bytes, srid, flags
 508
 509
 510def value_is_null(value: Any) -> bool:
 511    """
 512    Determine if a value is a null-like string.
 513    """
 514    return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')
 515
 516
 517def none_if_null(value: Any) -> Any:
 518    """
 519    Return `None` if a value is a null-like string.
 520    """
 521    return (None if value_is_null(value) else value)
 522
 523
 524def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
 525    """
 526    Quantize a given `Decimal` to a known scale and precision.
 527
 528    Parameters
 529    ----------
 530    x: Decimal
 531        The `Decimal` to be quantized.
 532
 533    precision: int
 534        The total number of significant digits.
 535
 536    scale: int
 537        The number of significant digits after the decimal point.
 538
 539    Returns
 540    -------
 541    A `Decimal` quantized to the specified scale and precision.
 542    """
 543    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
 544    try:
 545        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
 546    except InvalidOperation:
 547        pass
 548
 549    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
 550
 551
 552def serialize_decimal(
 553    x: Any,
 554    quantize: bool = False,
 555    precision: Optional[int] = None,
 556    scale: Optional[int] = None,
 557) -> Any:
 558    """
 559    Return a quantized string of an input decimal.
 560
 561    Parameters
 562    ----------
 563    x: Any
 564        The potential decimal to be serialized.
 565
 566    quantize: bool, default False
 567        If `True`, quantize the incoming Decimal to the specified scale and precision
 568        before serialization.
 569
 570    precision: Optional[int], default None
 571        The precision of the decimal to be quantized.
 572
 573    scale: Optional[int], default None
 574        The scale of the decimal to be quantized.
 575
 576    Returns
 577    -------
 578    A string of the input decimal or the input if not a Decimal.
 579    """
 580    if not isinstance(x, Decimal):
 581        return x
 582
 583    if value_is_null(x):
 584        return None
 585
 586    if quantize and scale and precision:
 587        x = quantize_decimal(x, precision, scale)
 588
 589    return f"{x:f}"
 590
 591
 592def coerce_timezone(
 593    dt: Any,
 594    strip_utc: bool = False,
 595) -> Any:
 596    """
 597    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
 598    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
 599    """
 600    if dt is None:
 601        return None
 602
 603    if isinstance(dt, int):
 604        return dt
 605
 606    if isinstance(dt, str):
 607        dateutil_parser = mrsm.attempt_import('dateutil.parser')
 608        try:
 609            dt = dateutil_parser.parse(dt)
 610        except Exception:
 611            return dt
 612
 613    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
 614    if dt_is_series:
 615        pandas = mrsm.attempt_import('pandas', lazy=False)
 616
 617        if (
 618            pandas.api.types.is_datetime64_any_dtype(dt) and (
 619                (dt.dt.tz is not None and not strip_utc)
 620                or
 621                (dt.dt.tz is None and strip_utc)
 622            )
 623        ):
 624            return dt
 625
 626        dt_series = to_datetime(dt, coerce_utc=False)
 627        if dt_series.dt.tz is None:
 628            dt_series = dt_series.dt.tz_localize(timezone.utc)
 629        if strip_utc:
 630            try:
 631                if dt_series.dt.tz is not None:
 632                    dt_series = dt_series.dt.tz_localize(None)
 633            except Exception:
 634                pass
 635
 636        return dt_series
 637
 638    if dt.tzinfo is None:
 639        if strip_utc:
 640            return dt
 641        return dt.replace(tzinfo=timezone.utc)
 642
 643    utc_dt = dt.astimezone(timezone.utc)
 644    if strip_utc:
 645        return utc_dt.replace(tzinfo=None)
 646    return utc_dt
 647
 648
 649def to_datetime(
 650    dt_val: Any,
 651    as_pydatetime: bool = False,
 652    coerce_utc: bool = True,
 653    precision_unit: Optional[str] = None,
 654) -> Any:
 655    """
 656    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
 657
 658    Parameters
 659    ----------
 660    dt_val: Any
 661        The value to coerce to Pandas Timestamps.
 662
 663    as_pydatetime: bool, default False
 664        If `True`, return a Python datetime object.
 665
 666    coerce_utc: bool, default True
 667        If `True`, ensure the value has UTC tzinfo.
 668
 669    precision_unit: Optional[str], default None
 670        If provided, enforce the provided precision unit.
 671    """
 672    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
 673    is_dask = 'dask' in getattr(dt_val, '__module__', '')
 674    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
 675    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
 676    pd = pandas if dd is None else dd
 677    enforce_precision = precision_unit is not None
 678    precision_unit = precision_unit or 'microsecond'
 679    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
 680    precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None)
 681    if not precision_abbreviation:
 682        raise ValueError(f"Invalid precision '{precision_unit}'.")
 683
 684    def parse(x: Any) -> Any:
 685        try:
 686            return dateutil_parser.parse(x)
 687        except Exception:
 688            return x
 689
 690    def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool:
 691        dtype_check_against = (
 692            f"datetime64[{precision_abbreviation}, UTC]"
 693            if with_utc
 694            else f"datetime64[{precision_abbreviation}]"
 695        )
 696        return (
 697            dtype_to_check == dtype_check_against
 698            if enforce_precision
 699            else (
 700                dtype_to_check.startswith('datetime64[')
 701                and (
 702                    ('utc' in dtype_to_check.lower())
 703                    if with_utc
 704                    else ('utc' not in dtype_to_check.lower())
 705                )
 706            )
 707        )
 708
 709    if isinstance(dt_val, pd.Timestamp):
 710        dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime()
 711        return (
 712            coerce_timezone(dt_val_to_return)
 713            if coerce_utc
 714            else dt_val_to_return
 715        )
 716
 717    if dt_is_series:
 718        changed_tz = False
 719        original_tz = None
 720        dtype = str(getattr(dt_val, 'dtype', 'object'))
 721        if (
 722            are_dtypes_equal(dtype, 'datetime')
 723            and 'utc' not in dtype.lower()
 724            and hasattr(dt_val, 'dt')
 725        ):
 726            original_tz = dt_val.dt.tz
 727            dt_val = dt_val.dt.tz_localize(timezone.utc)
 728            changed_tz = True
 729            dtype = str(getattr(dt_val, 'dtype', 'object'))
 730        try:
 731            new_dt_series = (
 732                dt_val
 733                if check_dtype(dtype, with_utc=True)
 734                else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]")
 735            )
 736        except pd.errors.OutOfBoundsDatetime:
 737            try:
 738                next_precision = get_next_precision_unit(true_precision_unit)
 739                next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision]
 740                new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]")
 741            except Exception:
 742                new_dt_series = None
 743        except ValueError:
 744            new_dt_series = None
 745        except TypeError:
 746            try:
 747                new_dt_series = (
 748                    new_dt_series
 749                    if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False)
 750                    else dt_val.astype(f"datetime64[{precision_abbreviation}]")
 751                )
 752            except Exception:
 753                new_dt_series = None
 754
 755        if new_dt_series is None:
 756            new_dt_series = dt_val.apply(lambda x: parse(str(x)))
 757
 758        if coerce_utc:
 759            return coerce_timezone(new_dt_series)
 760
 761        if changed_tz:
 762            new_dt_series = new_dt_series.dt.tz_localize(original_tz)
 763        return new_dt_series
 764
 765    try:
 766        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
 767        if new_dt_val.unit != precision_abbreviation:
 768            new_dt_val = new_dt_val.as_unit(precision_abbreviation)
 769        if as_pydatetime:
 770            return new_dt_val.to_pydatetime()
 771        return new_dt_val
 772    except (pd.errors.OutOfBoundsDatetime, ValueError):
 773        pass
 774
 775    new_dt_val = parse(dt_val)
 776    if not coerce_utc:
 777        return new_dt_val
 778    return coerce_timezone(new_dt_val)
 779
 780
 781def serialize_bytes(data: bytes) -> str:
 782    """
 783    Return the given bytes as a base64-encoded string.
 784    """
 785    import base64
 786    if not isinstance(data, bytes) and value_is_null(data):
 787        return data
 788    return base64.b64encode(data).decode('utf-8')
 789
 790
 791def serialize_geometry(
 792    geom: Any,
 793    geometry_format: str = 'wkb_hex',
 794    srid: Optional[int] = None,
 795) -> Union[str, Dict[str, Any], bytes, None]:
 796    """
 797    Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 
 798
 799    Parameters
 800    ----------
 801    geom: Any
 802        The potential geometry data to be serialized.
 803
 804    geometry_format: str, default 'wkb_hex'
 805        The serialization format for geometry data.
 806        Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`.
 807
 808    srid: Optional[int], default None
 809        If provided, use this as the source CRS when serializing to GeoJSON.
 810
 811    Returns
 812    -------
 813    A string containing the geometry data, or bytes, or a dictionary, or None.
 814    """
 815    if value_is_null(geom):
 816        return None
 817
 818    shapely, shapely_ops, pyproj, np = mrsm.attempt_import(
 819        'shapely', 'shapely.ops', 'pyproj', 'numpy',
 820        lazy=False,
 821    )
 822    if geometry_format == 'geojson':
 823        if srid:
 824            transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True)
 825            geom = shapely_ops.transform(transformer.transform, geom)
 826        geojson_str = shapely.to_geojson(geom)
 827        return json.loads(geojson_str)
 828
 829    if not hasattr(geom, 'wkb_hex'):
 830        return str(geom)
 831
 832    byte_order = 1 if np.little_endian else 0
 833
 834    if geometry_format.startswith("wkb"):
 835        return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True)
 836    elif geometry_format == 'gpkg_wkb':
 837        wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order)
 838        flags = (
 839            ((byte_order & 0x01) | (0x20))
 840            if geom.is_empty
 841            else (byte_order & 0x01)
 842        )
 843        srid_val = srid or -1
 844        header = struct.pack(
 845            '<ccBBi',
 846            b'G', b'P',
 847            0,
 848            flags,
 849            srid_val
 850        )
 851        return header + wkb_data
 852
 853    return shapely.to_wkt(geom)
 854
 855
 856def deserialize_geometry(geom_wkb: Union[str, bytes]):
 857    """
 858    Deserialize a WKB string into a shapely geometry object.
 859    """
 860    shapely = mrsm.attempt_import('shapely', lazy=False)
 861    return shapely.wkb.loads(geom_wkb)
 862
 863
 864def project_geometry(geom, srid: int, to_srid: int = 4326):
 865    """
 866    Project a shapely geometry object to a new CRS (SRID).
 867    """
 868    pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False)
 869    transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True)
 870    return shapely_ops.transform(transformer.transform, geom)
 871
 872
 873def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]:
 874    """
 875    Given a serialized ASCII string of bytes data, return the original bytes.
 876    The input data may either be base64- or hex-encoded.
 877
 878    Parameters
 879    ----------
 880    data: Optional[str]
 881        The string to be deserialized into bytes.
 882        May be base64- or hex-encoded (prefixed with `'\\x'`).
 883
 884    force_hex: bool = False
 885        If `True`, treat the input string as hex-encoded.
 886        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
 887        This will still strip the leading `'\\x'` prefix if present.
 888
 889    Returns
 890    -------
 891    The original bytes used to produce the encoded string `data`.
 892    """
 893    if not isinstance(data, str) and value_is_null(data):
 894        return data
 895
 896    import binascii
 897    import base64
 898
 899    is_hex = force_hex or data.startswith('\\x')
 900
 901    if is_hex:
 902        if data.startswith('\\x'):
 903            data = data[2:]
 904        return binascii.unhexlify(data)
 905
 906    return base64.b64decode(data)
 907
 908
 909def deserialize_base64(data: str) -> bytes:
 910    """
 911    Return the original bytestring from the given base64-encoded string.
 912    """
 913    import base64
 914    return base64.b64decode(data)
 915
 916
 917def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]:
 918    """
 919    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
 920    """
 921    import binascii
 922    if not isinstance(data, bytes) and value_is_null(data):
 923        return data
 924    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
 925
 926
 927def serialize_datetime(dt: datetime) -> Union[str, None]:
 928    """
 929    Serialize a datetime object into JSON (ISO format string).
 930
 931    Examples
 932    --------
 933    >>> import json
 934    >>> from datetime import datetime
 935    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
 936    '{"a": "2022-01-01T00:00:00Z"}'
 937
 938    """
 939    if not hasattr(dt, 'isoformat'):
 940        return None
 941
 942    tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else ''
 943    return dt.isoformat() + tz_suffix
 944
 945
 946def serialize_date(d: date) -> Union[str, None]:
 947    """
 948    Serialize a date object into its ISO representation.
 949    """
 950    return d.isoformat() if hasattr(d, 'isoformat') else None
 951
 952
 953def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]:
 954    """
 955    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
 956
 957    Parameters
 958    ----------
 959    x: Any
 960        The value to serialize.
 961
 962    default_to_str: bool, default True
 963        If `True`, return a string of `x` if x is not a designated type.
 964        Otherwise return x.
 965
 966    Returns
 967    -------
 968    A serialized version of x, or x.
 969    """
 970    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
 971        return x.meta
 972
 973    if hasattr(x, 'tzinfo'):
 974        return serialize_datetime(x)
 975
 976    if hasattr(x, 'isoformat'):
 977        return serialize_date(x)
 978
 979    if isinstance(x, bytes):
 980        return serialize_bytes(x)
 981
 982    if isinstance(x, Decimal):
 983        return serialize_decimal(x)
 984
 985    if 'shapely' in str(type(x)):
 986        return serialize_geometry(x)
 987
 988    if value_is_null(x):
 989        return None
 990
 991    if isinstance(x, (dict, list, tuple)):
 992        return json.dumps(x, default=json_serialize_value, separators=(',', ':'))
 993
 994    return str(x) if default_to_str else x
 995
 996
 997def get_geometry_type_srid(
 998    dtype: str = 'geometry',
 999    default_type: str = 'geometry',
1000    default_srid: int = 4326,
1001) -> Union[Tuple[str, int], Tuple[str, None]]:
1002    """
1003    Given the specified geometry `dtype`, return a tuple in the form (type, SRID).
1004
1005    Parameters
1006    ----------
1007    dtype: Optional[str], default None
1008        Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`).
1009        You may specify a supported `shapely` geometry type and an SRID in the dtype modifier:
1010
1011        - `Point`
1012        - `LineString`
1013        - `LinearRing`
1014        - `Polygon`
1015        - `MultiPoint`
1016        - `MultiLineString`
1017        - `MultiPolygon`
1018        - `GeometryCollection`
1019
1020    Returns
1021    -------
1022    A tuple in the form (type, SRID).
1023    Defaults to `(default_type, default_srid)`.
1024
1025    Examples
1026    --------
1027    >>> from meerschaum.utils.dtypes import get_geometry_type_srid
1028    >>> get_geometry_type_srid()
1029    ('geometry', 4326)
1030    >>> get_geometry_type_srid('geometry[]')
1031    ('geometry', 4326)
1032    >>> get_geometry_type_srid('geometry[Point, 0]')
1033    ('Point', 0)
1034    >>> get_geometry_type_srid('geometry[0, Point]')
1035    ('Point', 0)
1036    >>> get_geometry_type_srid('geometry[0]')
1037    ('geometry', 0)
1038    >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
1039    ('MultiLineString', 4326)
1040    >>> get_geometry_type_srid('geography')
1041    ('geometry', 4326)
1042    >>> get_geometry_type_srid('geography[POINT]')
1043    ('Point', 4376)
1044    """
1045    from meerschaum.utils.misc import is_int
1046    ### NOTE: PostGIS syntax must also be parsed.
1047    dtype = dtype.replace('(', '[').replace(')', ']')
1048    bare_dtype = dtype.split('[', maxsplit=1)[0]
1049    modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']')
1050    if not modifier:
1051        return default_type, default_srid
1052
1053    parts = [
1054        part.split('=')[-1].strip()
1055        for part in modifier.split(',')
1056    ]
1057    parts_casted = [
1058        (
1059            int(part)
1060            if is_int(part)
1061            else part
1062        )
1063        for part in parts
1064    ]
1065
1066    srid = default_srid
1067    geometry_type = default_type
1068
1069    for part in parts_casted:
1070        if isinstance(part, int):
1071            srid = part
1072            break
1073
1074    for part in parts_casted:
1075        if isinstance(part, str):
1076            geometry_type = part
1077            break
1078
1079    return geometry_type, srid
1080
1081
1082def get_current_timestamp(
1083    precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'],
1084    precision_interval: int = 1,
1085    round_to: str = 'down',
1086    as_pandas: bool = False,
1087    as_int: bool = False,
1088    _now: Union[datetime, int, None] = None,
1089) -> 'Union[datetime, pd.Timestamp, int]':
1090    """
1091    Return the current UTC timestamp to nanosecond precision.
1092
1093    Parameters
1094    ----------
1095    precision_unit: str, default 'us'
1096        The precision of the timestamp to be returned.
1097        Valid values are the following:
1098            - `ns` / `nanosecond`
1099            - `us` / `microsecond`
1100            - `ms` / `millisecond`
1101            - `s` / `sec` / `second`
1102            - `m` / `min` / `minute`
1103            - `h` / `hr` / `hour`
1104            - `d` / `day`
1105
1106    precision_interval: int, default 1
1107        Round the timestamp to the `precision_interval` units.
1108        For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals.
1109        Note: `precision_interval` must be 1 when `precision='nanosecond'`.
1110
1111    round_to: str, default 'down'
1112        The direction to which to round the timestamp.
1113        Available options are `down`, `up`, and `closest`.
1114
1115    as_pandas: bool, default False
1116        If `True`, return a Pandas Timestamp.
1117        This is always true if `unit` is `nanosecond`.
1118
1119    as_int: bool, default False
1120        If `True`, return the timestamp to an integer.
1121        Overrides `as_pandas`.
1122
1123    Returns
1124    -------
1125    A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
1126
1127    Examples
1128    --------
1129    >>> get_current_timestamp('ns')
1130    Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
1131    >>> get_current_timestamp('ms')
1132    Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1133    """
1134    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1135    if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS:
1136        from meerschaum.utils.misc import items_str
1137        raise ValueError(
1138            f"Unknown precision unit '{precision_unit}'. "
1139            "Accepted values are "
1140            f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}."
1141        )
1142
1143    if not as_int:
1144        as_pandas = as_pandas or true_precision_unit == 'nanosecond'
1145    pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None
1146
1147    if true_precision_unit == 'nanosecond':
1148        if precision_interval != 1:
1149            warn("`precision_interval` must be 1 for nanosecond precision.")
1150        now_ts = time.time_ns() if not isinstance(_now, int) else _now
1151        if as_int:
1152            return now_ts
1153        return pd.to_datetime(now_ts, unit='ns', utc=True)
1154
1155    now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now
1156    delta = timedelta(**{true_precision_unit + 's': precision_interval})
1157    rounded_now = round_time(now, delta, to=round_to)
1158
1159    if as_int:
1160        return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit])
1161
1162    ts_val = (
1163        pd.to_datetime(rounded_now, utc=True)
1164        if as_pandas
1165        else rounded_now
1166    )
1167
1168    if not as_pandas:
1169        return ts_val
1170
1171    as_unit_precisions = ('microsecond', 'millisecond', 'second')
1172    if true_precision_unit not in as_unit_precisions:
1173        return ts_val
1174
1175    return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])
1176
1177
1178def is_dtype_special(type_: str) -> bool:
1179    """
1180    Return whether a dtype should be treated as a special Meerschaum dtype.
1181    This is not the same as a Meerschaum alias.
1182    """
1183    true_type = MRSM_ALIAS_DTYPES.get(type_, type_)
1184    if true_type in (
1185        'uuid',
1186        'json',
1187        'bytes',
1188        'numeric',
1189        'datetime',
1190        'geometry',
1191        'geography',
1192        'date',
1193        'bool',
1194    ):
1195        return True
1196
1197    if are_dtypes_equal(true_type, 'datetime'):
1198        return True
1199
1200    if are_dtypes_equal(true_type, 'date'):
1201        return True
1202
1203    if true_type.startswith('numeric'):
1204        return True
1205
1206    if true_type.startswith('bool'):
1207        return True
1208
1209    if true_type.startswith('geometry'):
1210        return True
1211
1212    if true_type.startswith('geography'):
1213        return True
1214
1215    return False
1216
1217
1218def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1219    """
1220    Get the next precision string in order of value.
1221
1222    Parameters
1223    ----------
1224    precision_unit: str
1225        The precision string (`'nanosecond'`, `'ms'`, etc.).
1226
1227    decrease: bool, defaul True
1228        If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`).
1229        If `False`, return the precision unit which is higher.
1230
1231    Returns
1232    -------
1233    A `precision` string which is lower or higher than the given precision unit.
1234
1235    Examples
1236    --------
1237    >>> get_next_precision_unit('nanosecond')
1238    'microsecond'
1239    >>> get_next_precision_unit('ms')
1240    'second'
1241    >>> get_next_precision_unit('hour', decrease=False)
1242    'minute'
1243    """
1244    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1245    precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None)
1246    if not precision_scalar:
1247        raise ValueError(f"Invalid precision unit '{precision_unit}'.")
1248
1249    precisions = sorted(
1250        list(MRSM_PRECISION_UNITS_SCALARS),
1251        key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p]
1252    )
1253
1254    precision_index = precisions.index(true_precision_unit)
1255    new_precision_index = precision_index + (-1 if decrease else 1)
1256    if new_precision_index < 0 or new_precision_index >= len(precisions):
1257        raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.")
1258
1259    return precisions[new_precision_index]
1260
1261
1262def round_time(
1263    dt: Optional[datetime] = None,
1264    date_delta: Optional[timedelta] = None,
1265    to: 'str' = 'down'
1266) -> datetime:
1267    """
1268    Round a datetime object to a multiple of a timedelta.
1269
1270    Parameters
1271    ----------
1272    dt: Optional[datetime], default None
1273        If `None`, grab the current UTC datetime.
1274
1275    date_delta: Optional[timedelta], default None
1276        If `None`, use a delta of 1 minute.
1277
1278    to: 'str', default 'down'
1279        Available options are `'up'`, `'down'`, and `'closest'`.
1280
1281    Returns
1282    -------
1283    A rounded `datetime` object.
1284
1285    Examples
1286    --------
1287    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
1288    datetime.datetime(2022, 1, 1, 12, 15)
1289    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
1290    datetime.datetime(2022, 1, 1, 12, 16)
1291    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
1292    datetime.datetime(2022, 1, 1, 12, 0)
1293    >>> round_time(
1294    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
1295    ...   timedelta(hours=1),
1296    ...   to = 'closest'
1297    ... )
1298    datetime.datetime(2022, 1, 1, 12, 0)
1299    >>> round_time(
1300    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
1301    ...   datetime.timedelta(hours=1),
1302    ...   to = 'closest'
1303    ... )
1304    datetime.datetime(2022, 1, 1, 13, 0)
1305
1306    """
1307    from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP
1308    if date_delta is None:
1309        date_delta = timedelta(minutes=1)
1310
1311    if dt is None:
1312        dt = datetime.now(timezone.utc).replace(tzinfo=None)
1313
1314    def get_total_microseconds(td: timedelta) -> int:
1315        return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds
1316
1317    round_to_microseconds = get_total_microseconds(date_delta)
1318    if round_to_microseconds == 0:
1319        return dt
1320
1321    dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min
1322    dt_total_microseconds = get_total_microseconds(dt_delta_from_min)
1323
1324    dt_dec = Decimal(dt_total_microseconds)
1325    round_to_dec = Decimal(round_to_microseconds)
1326
1327    div = dt_dec / round_to_dec
1328    if to == 'down':
1329        num_intervals = div.to_integral_value(rounding=ROUND_DOWN)
1330    elif to == 'up':
1331        num_intervals = div.to_integral_value(rounding=ROUND_UP)
1332    else:
1333        num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP)
1334
1335    rounded_dt_total_microseconds = num_intervals * round_to_dec
1336    adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds
1337
1338    return dt + timedelta(microseconds=adjustment_microseconds)
MRSM_ALIAS_DTYPES: Dict[str, str] = {'decimal': 'numeric', 'Decimal': 'numeric', 'number': 'numeric', 'jsonl': 'json', 'JSON': 'json', 'binary': 'bytes', 'blob': 'bytes', 'varbinary': 'bytes', 'bytea': 'bytes', 'guid': 'uuid', 'UUID': 'uuid', 'geom': 'geometry', 'geog': 'geography', 'boolean': 'bool', 'day': 'date'}
MRSM_PD_DTYPES: Dict[Optional[str], str] = {'json': 'object', 'numeric': 'object', 'geometry': 'object', 'geography': 'object', 'uuid': 'object', 'date': 'date32[day][pyarrow]', 'datetime': 'datetime64[us, UTC]', 'bool': 'bool[pyarrow]', 'int': 'int64[pyarrow]', 'int8': 'int8[pyarrow]', 'int16': 'int16[pyarrow]', 'int32': 'int32[pyarrow]', 'int64': 'int64[pyarrow]', 'str': 'string', 'bytes': 'binary[pyarrow]', None: 'object'}
MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = {'nanosecond': 1000000000, 'microsecond': 1000000, 'millisecond': 1000, 'second': 1, 'minute': 0.016666666666666666, 'hour': 0.0002777777777777778, 'day': 1.1574074074074073e-05}
MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = {'ns': 'nanosecond', 'us': 'microsecond', 'ms': 'millisecond', 's': 'second', 'sec': 'second', 'm': 'minute', 'min': 'minute', 'h': 'hour', 'hr': 'hour', 'd': 'day', 'D': 'day'}
MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = {'nanosecond': 'ns', 'microsecond': 'us', 'millisecond': 'ms', 'second': 's', 'minute': 'min', 'hour': 'hr', 'day': 'D'}
def to_pandas_dtype(dtype: str) -> str:
 93def to_pandas_dtype(dtype: str) -> str:
 94    """
 95    Cast a supported Meerschaum dtype to a Pandas dtype.
 96    """
 97    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
 98    if known_dtype is not None:
 99        return known_dtype
100
101    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
102    if alias_dtype is not None:
103        return MRSM_PD_DTYPES[alias_dtype]
104
105    if dtype.startswith('numeric'):
106        return MRSM_PD_DTYPES['numeric']
107
108    if dtype.startswith('geometry'):
109        return MRSM_PD_DTYPES['geometry']
110
111    if dtype.startswith('geography'):
112        return MRSM_PD_DTYPES['geography']
113
114    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
115    ### treat it as a SQL db type.
116    if dtype.split(' ')[0].isupper():
117        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
118        return get_pd_type_from_db_type(dtype)
119
120    from meerschaum.utils.packages import attempt_import
121    _ = attempt_import('pyarrow', lazy=False)
122    pandas = attempt_import('pandas', lazy=False)
123
124    try:
125        return str(pandas.api.types.pandas_dtype(dtype))
126    except Exception:
127        warn(
128            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
129            + f"{traceback.format_exc()}",
130            stack=False,
131        )
132    return 'object'

Cast a supported Meerschaum dtype to a Pandas dtype.

def are_dtypes_equal( ldtype: Union[str, Dict[str, str]], rdtype: Union[str, Dict[str, str]]) -> bool:
135def are_dtypes_equal(
136    ldtype: Union[str, Dict[str, str]],
137    rdtype: Union[str, Dict[str, str]],
138) -> bool:
139    """
140    Determine whether two dtype strings may be considered
141    equivalent to avoid unnecessary conversions.
142
143    Parameters
144    ----------
145    ldtype: Union[str, Dict[str, str]]
146        The left dtype to compare.
147        May also provide a dtypes dictionary.
148
149    rdtype: Union[str, Dict[str, str]]
150        The right dtype to compare.
151        May also provide a dtypes dictionary.
152
153    Returns
154    -------
155    A `bool` indicating whether the two dtypes are to be considered equivalent.
156    """
157    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
158        lkeys = sorted([str(k) for k in ldtype.keys()])
159        rkeys = sorted([str(k) for k in rdtype.keys()])
160        for lkey, rkey in zip(lkeys, rkeys):
161            if lkey != rkey:
162                return False
163            ltype = ldtype[lkey]
164            rtype = rdtype[rkey]
165            if not are_dtypes_equal(ltype, rtype):
166                return False
167        return True
168
169    try:
170        if ldtype == rdtype:
171            return True
172    except Exception:
173        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
174        return False
175
176    ### Sometimes pandas dtype objects are passed.
177    ldtype = str(ldtype).split('[', maxsplit=1)[0]
178    rdtype = str(rdtype).split('[', maxsplit=1)[0]
179
180    if ldtype in MRSM_ALIAS_DTYPES:
181        ldtype = MRSM_ALIAS_DTYPES[ldtype]
182
183    if rdtype in MRSM_ALIAS_DTYPES:
184        rdtype = MRSM_ALIAS_DTYPES[rdtype]
185
186    json_dtypes = ('json', 'object')
187    if ldtype in json_dtypes and rdtype in json_dtypes:
188        return True
189
190    numeric_dtypes = ('numeric', 'decimal', 'object')
191    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
192        return True
193
194    uuid_dtypes = ('uuid', 'object')
195    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
196        return True
197
198    bytes_dtypes = ('bytes', 'object', 'binary')
199    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
200        return True
201
202    geometry_dtypes = ('geometry', 'object', 'geography')
203    if ldtype in geometry_dtypes and rdtype in geometry_dtypes:
204        return True
205
206    if ldtype.lower() == rdtype.lower():
207        return True
208
209    datetime_dtypes = ('datetime', 'timestamp')
210    ldtype_found_dt_prefix = False
211    rdtype_found_dt_prefix = False
212    for dt_prefix in datetime_dtypes:
213        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
214        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
215    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
216        return True
217
218    string_dtypes = ('str', 'string', 'object')
219    if ldtype in string_dtypes and rdtype in string_dtypes:
220        return True
221
222    int_dtypes = (
223        'int', 'int64', 'int32', 'int16', 'int8',
224        'uint', 'uint64', 'uint32', 'uint16', 'uint8',
225    )
226    int_substrings = ('int',)
227    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
228        return True
229    for substring in int_substrings:
230        if substring in ldtype.lower() and substring in rdtype.lower():
231            return True
232
233    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
234    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
235        return True
236
237    bool_dtypes = ('bool', 'boolean')
238    if ldtype in bool_dtypes and rdtype in bool_dtypes:
239        return True
240
241    date_dtypes = (
242        'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]',
243        'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]',
244    )
245    if ldtype in date_dtypes and rdtype in date_dtypes:
246        return True
247
248    return False

Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.

Parameters
  • ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
  • rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
  • A bool indicating whether the two dtypes are to be considered equivalent.
def is_dtype_numeric(dtype: str) -> bool:
251def is_dtype_numeric(dtype: str) -> bool:
252    """
253    Determine whether a given `dtype` string
254    should be considered compatible with the Meerschaum dtype `numeric`.
255
256    Parameters
257    ----------
258    dtype: str
259        The pandas-like dtype string.
260
261    Returns
262    -------
263    A bool indicating the dtype is compatible with `numeric`.
264    """
265    dtype_lower = dtype.lower()
266
267    acceptable_substrings = ('numeric', 'float', 'double', 'int')
268    for substring in acceptable_substrings:
269        if substring in dtype_lower:
270            return True
271
272    return False

Determine whether a given dtype string should be considered compatible with the Meerschaum dtype numeric.

Parameters
  • dtype (str): The pandas-like dtype string.
Returns
  • A bool indicating the dtype is compatible with numeric.
def attempt_cast_to_numeric( value: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
275def attempt_cast_to_numeric(
276    value: Any,
277    quantize: bool = False,
278    precision: Optional[int] = None,
279    scale: Optional[int] = None,
280)-> Any:
281    """
282    Given a value, attempt to coerce it into a numeric (Decimal).
283
284    Parameters
285    ----------
286    value: Any
287        The value to be cast to a Decimal.
288
289    quantize: bool, default False
290        If `True`, quantize the decimal to the specified precision and scale.
291
292    precision: Optional[int], default None
293        If `quantize` is `True`, use this precision.
294
295    scale: Optional[int], default None
296        If `quantize` is `True`, use this scale.
297
298    Returns
299    -------
300    A `Decimal` if possible, or `value`.
301    """
302    if isinstance(value, Decimal):
303        if quantize and precision and scale:
304            return quantize_decimal(value, precision, scale)
305        return value
306    try:
307        if value_is_null(value):
308            return Decimal('NaN')
309
310        dec = Decimal(str(value))
311        if not quantize or not precision or not scale:
312            return dec
313        return quantize_decimal(dec, precision, scale)
314    except Exception:
315        return value

Given a value, attempt to coerce it into a numeric (Decimal).

Parameters
  • value (Any): The value to be cast to a Decimal.
  • quantize (bool, default False): If True, quantize the decimal to the specified precision and scale.
  • precision (Optional[int], default None): If quantize is True, use this precision.
  • scale (Optional[int], default None): If quantize is True, use this scale.
Returns
  • A Decimal if possible, or value.
def attempt_cast_to_uuid(value: Any) -> Any:
318def attempt_cast_to_uuid(value: Any) -> Any:
319    """
320    Given a value, attempt to coerce it into a UUID (`uuid4`).
321    """
322    if isinstance(value, uuid.UUID):
323        return value
324    try:
325        return (
326            uuid.UUID(str(value))
327            if not value_is_null(value)
328            else None
329        )
330    except Exception:
331        return value

Given a value, attempt to coerce it into a UUID (uuid4).

def attempt_cast_to_bytes(value: Any) -> Any:
334def attempt_cast_to_bytes(value: Any) -> Any:
335    """
336    Given a value, attempt to coerce it into a bytestring.
337    """
338    if isinstance(value, bytes):
339        return value
340    try:
341        return (
342            deserialize_bytes_string(str(value))
343            if not value_is_null(value)
344            else None
345        )
346    except Exception:
347        return value

Given a value, attempt to coerce it into a bytestring.

def attempt_cast_to_geometry(value: Any) -> Any:
350def attempt_cast_to_geometry(value: Any) -> Any:
351    """
352    Given a value, attempt to coerce it into a `shapely` (`geometry`) object.
353    """
354    typ = str(type(value))
355    if 'pandas' in typ and 'Series' in typ:
356        if 'GeoSeries' in typ:
357            return value
358        gpd = mrsm.attempt_import('geopandas')
359        if len(value) == 0:
360            return gpd.GeoSeries([])
361
362        ix = value.first_valid_index()
363        if ix is None:
364            try:
365                return gpd.GeoSeries(value)
366            except Exception:
367                traceback.print_exc()
368                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
369        sample_val = value[ix]
370
371        sample_is_gpkg = geometry_is_gpkg(sample_val)
372        if sample_is_gpkg:
373            try:
374                value = value.apply(lambda x: gpkg_wkb_to_wkb(x)[0])
375            except Exception:
376                traceback.print_exc()
377                return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
378
379        sample_is_wkt = geometry_is_wkt(sample_val) if not sample_is_gpkg else False
380        try:
381            return (
382                gpd.GeoSeries.from_wkt(value)
383                if sample_is_wkt
384                else gpd.GeoSeries.from_wkb(value)
385            )
386        except Exception:
387            traceback.print_exc()
388            return gpd.GeoSeries(attempt_cast_to_geometry(val) for val in value)
389
390    if 'shapely' in typ:
391        return value
392
393    shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import(
394        'shapely',
395        'shapely.wkt',
396        'shapely.wkb',
397        lazy=False,
398    )
399
400    if isinstance(value, (dict, list)):
401        try:
402            return shapely.from_geojson(json.dumps(value))
403        except Exception:
404            return value
405
406    value_is_gpkg = geometry_is_gpkg(value)
407    if value_is_gpkg:
408        try:
409            wkb_data, _, _ = gpkg_wkb_to_wkb(value)
410            return shapely_wkb.loads(wkb_data)
411        except Exception:
412            return value
413
414    value_is_wkt = geometry_is_wkt(value)
415    if value_is_wkt is None:
416        return value
417
418    try:
419        return (
420            shapely_wkt.loads(value)
421            if value_is_wkt
422            else shapely_wkb.loads(value)
423        )
424    except Exception:
425        pass
426
427    return value

Given a value, attempt to coerce it into a shapely (geometry) object.

def geometry_is_wkt(value: Union[str, bytes]) -> Optional[bool]:
430def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]:
431    """
432    Determine whether an input value should be treated as WKT or WKB geometry data.
433
434    Parameters
435    ----------
436    value: Union[str, bytes]
437        The input data to be parsed into geometry data.
438
439    Returns
440    -------
441    A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB).
442    Return `None` if `value` should be parsed as neither.
443    """
444    import re
445    if not isinstance(value, (str, bytes)):
446        return None
447
448    if isinstance(value, bytes):
449        return False
450    
451    wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$'
452    if re.match(wkt_pattern, value, re.IGNORECASE):
453        return True
454    
455    if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0:
456        return False
457    
458    return None

Determine whether an input value should be treated as WKT or WKB geometry data.

Parameters
  • value (Union[str, bytes]): The input data to be parsed into geometry data.
Returns
  • A bool (True if value is WKT and False if it should be treated as WKB).
  • Return None if value should be parsed as neither.
def geometry_is_gpkg(value: bytes) -> bool:
461def geometry_is_gpkg(value: bytes) -> bool:
462    """
463    Return whether the input `value` is formatted as GeoPackage WKB.
464    """
465    if not isinstance(value, bytes) or len(value) < 2:
466        return False
467
468    return value[0:2] == b'GP'

Return whether the input value is formatted as GeoPackage WKB.

def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]:
470def gpkg_wkb_to_wkb(gpkg_wkb_bytes: bytes) -> Tuple[bytes, int, bytes]:
471    """
472    Converts GeoPackage WKB to standard WKB by removing the header.
473    
474    Parameters
475    ----------
476    gpkg_wkb_bytes: bytes
477        The GeoPackage WKB byte string.
478        
479    Returns
480    -------
481    A tuple containing the standard WKB bytes, SRID, and flags.
482    """
483    magic_number = gpkg_wkb_bytes[0:2]
484    if magic_number != b'GP':
485        raise ValueError("Invalid GeoPackage WKB header: missing magic number.")
486    
487    try:
488        header = gpkg_wkb_bytes[0:8]
489        header_vals = struct.unpack('<ccBBi', header)
490        flags = header_vals[-2]
491        srid = header_vals[-1]
492    except struct.error:
493        header = gpkg_wkb_bytes[0:6]
494        header_vals = struct.unpack('<ccBBh', header)
495        flags = header_vals[-2]
496        srid = header_vals[-1]
497
498    envelope_type = (flags >> 1) & 0x07
499    envelope_sizes = {
500        0: 0,
501        1: 32,
502        2: 48,
503        3: 48,
504        4: 64,
505    }
506    header_length = 8 + envelope_sizes.get(envelope_type, 0)
507    standard_wkb_bytes = gpkg_wkb_bytes[header_length:]
508    return standard_wkb_bytes, srid, flags

Converts GeoPackage WKB to standard WKB by removing the header.

Parameters
  • gpkg_wkb_bytes (bytes): The GeoPackage WKB byte string.
Returns
  • A tuple containing the standard WKB bytes, SRID, and flags.
def value_is_null(value: Any) -> bool:
511def value_is_null(value: Any) -> bool:
512    """
513    Determine if a value is a null-like string.
514    """
515    return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')

Determine if a value is a null-like string.

def none_if_null(value: Any) -> Any:
518def none_if_null(value: Any) -> Any:
519    """
520    Return `None` if a value is a null-like string.
521    """
522    return (None if value_is_null(value) else value)

Return None if a value is a null-like string.

def quantize_decimal(x: decimal.Decimal, precision: int, scale: int) -> decimal.Decimal:
525def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
526    """
527    Quantize a given `Decimal` to a known scale and precision.
528
529    Parameters
530    ----------
531    x: Decimal
532        The `Decimal` to be quantized.
533
534    precision: int
535        The total number of significant digits.
536
537    scale: int
538        The number of significant digits after the decimal point.
539
540    Returns
541    -------
542    A `Decimal` quantized to the specified scale and precision.
543    """
544    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
545    try:
546        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
547    except InvalidOperation:
548        pass
549
550    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")

Quantize a given Decimal to a known scale and precision.

Parameters
  • x (Decimal): The Decimal to be quantized.
  • precision (int): The total number of significant digits.
  • scale (int): The number of significant digits after the decimal point.
Returns
  • A Decimal quantized to the specified scale and precision.
def serialize_decimal( x: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
553def serialize_decimal(
554    x: Any,
555    quantize: bool = False,
556    precision: Optional[int] = None,
557    scale: Optional[int] = None,
558) -> Any:
559    """
560    Return a quantized string of an input decimal.
561
562    Parameters
563    ----------
564    x: Any
565        The potential decimal to be serialized.
566
567    quantize: bool, default False
568        If `True`, quantize the incoming Decimal to the specified scale and precision
569        before serialization.
570
571    precision: Optional[int], default None
572        The precision of the decimal to be quantized.
573
574    scale: Optional[int], default None
575        The scale of the decimal to be quantized.
576
577    Returns
578    -------
579    A string of the input decimal or the input if not a Decimal.
580    """
581    if not isinstance(x, Decimal):
582        return x
583
584    if value_is_null(x):
585        return None
586
587    if quantize and scale and precision:
588        x = quantize_decimal(x, precision, scale)
589
590    return f"{x:f}"

Return a quantized string of an input decimal.

Parameters
  • x (Any): The potential decimal to be serialized.
  • quantize (bool, default False): If True, quantize the incoming Decimal to the specified scale and precision before serialization.
  • precision (Optional[int], default None): The precision of the decimal to be quantized.
  • scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
  • A string of the input decimal or the input if not a Decimal.
def coerce_timezone(dt: Any, strip_utc: bool = False) -> Any:
593def coerce_timezone(
594    dt: Any,
595    strip_utc: bool = False,
596) -> Any:
597    """
598    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
599    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
600    """
601    if dt is None:
602        return None
603
604    if isinstance(dt, int):
605        return dt
606
607    if isinstance(dt, str):
608        dateutil_parser = mrsm.attempt_import('dateutil.parser')
609        try:
610            dt = dateutil_parser.parse(dt)
611        except Exception:
612            return dt
613
614    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
615    if dt_is_series:
616        pandas = mrsm.attempt_import('pandas', lazy=False)
617
618        if (
619            pandas.api.types.is_datetime64_any_dtype(dt) and (
620                (dt.dt.tz is not None and not strip_utc)
621                or
622                (dt.dt.tz is None and strip_utc)
623            )
624        ):
625            return dt
626
627        dt_series = to_datetime(dt, coerce_utc=False)
628        if dt_series.dt.tz is None:
629            dt_series = dt_series.dt.tz_localize(timezone.utc)
630        if strip_utc:
631            try:
632                if dt_series.dt.tz is not None:
633                    dt_series = dt_series.dt.tz_localize(None)
634            except Exception:
635                pass
636
637        return dt_series
638
639    if dt.tzinfo is None:
640        if strip_utc:
641            return dt
642        return dt.replace(tzinfo=timezone.utc)
643
644    utc_dt = dt.astimezone(timezone.utc)
645    if strip_utc:
646        return utc_dt.replace(tzinfo=None)
647    return utc_dt

Given a datetime, pandas Timestamp or Series of Timestamp, return a UTC timestamp (strip timezone if strip_utc is True.

def to_datetime( dt_val: Any, as_pydatetime: bool = False, coerce_utc: bool = True, precision_unit: Optional[str] = None) -> Any:
650def to_datetime(
651    dt_val: Any,
652    as_pydatetime: bool = False,
653    coerce_utc: bool = True,
654    precision_unit: Optional[str] = None,
655) -> Any:
656    """
657    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
658
659    Parameters
660    ----------
661    dt_val: Any
662        The value to coerce to Pandas Timestamps.
663
664    as_pydatetime: bool, default False
665        If `True`, return a Python datetime object.
666
667    coerce_utc: bool, default True
668        If `True`, ensure the value has UTC tzinfo.
669
670    precision_unit: Optional[str], default None
671        If provided, enforce the provided precision unit.
672    """
673    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
674    is_dask = 'dask' in getattr(dt_val, '__module__', '')
675    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
676    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
677    pd = pandas if dd is None else dd
678    enforce_precision = precision_unit is not None
679    precision_unit = precision_unit or 'microsecond'
680    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
681    precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None)
682    if not precision_abbreviation:
683        raise ValueError(f"Invalid precision '{precision_unit}'.")
684
685    def parse(x: Any) -> Any:
686        try:
687            return dateutil_parser.parse(x)
688        except Exception:
689            return x
690
691    def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool:
692        dtype_check_against = (
693            f"datetime64[{precision_abbreviation}, UTC]"
694            if with_utc
695            else f"datetime64[{precision_abbreviation}]"
696        )
697        return (
698            dtype_to_check == dtype_check_against
699            if enforce_precision
700            else (
701                dtype_to_check.startswith('datetime64[')
702                and (
703                    ('utc' in dtype_to_check.lower())
704                    if with_utc
705                    else ('utc' not in dtype_to_check.lower())
706                )
707            )
708        )
709
710    if isinstance(dt_val, pd.Timestamp):
711        dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime()
712        return (
713            coerce_timezone(dt_val_to_return)
714            if coerce_utc
715            else dt_val_to_return
716        )
717
718    if dt_is_series:
719        changed_tz = False
720        original_tz = None
721        dtype = str(getattr(dt_val, 'dtype', 'object'))
722        if (
723            are_dtypes_equal(dtype, 'datetime')
724            and 'utc' not in dtype.lower()
725            and hasattr(dt_val, 'dt')
726        ):
727            original_tz = dt_val.dt.tz
728            dt_val = dt_val.dt.tz_localize(timezone.utc)
729            changed_tz = True
730            dtype = str(getattr(dt_val, 'dtype', 'object'))
731        try:
732            new_dt_series = (
733                dt_val
734                if check_dtype(dtype, with_utc=True)
735                else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]")
736            )
737        except pd.errors.OutOfBoundsDatetime:
738            try:
739                next_precision = get_next_precision_unit(true_precision_unit)
740                next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision]
741                new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]")
742            except Exception:
743                new_dt_series = None
744        except ValueError:
745            new_dt_series = None
746        except TypeError:
747            try:
748                new_dt_series = (
749                    new_dt_series
750                    if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False)
751                    else dt_val.astype(f"datetime64[{precision_abbreviation}]")
752                )
753            except Exception:
754                new_dt_series = None
755
756        if new_dt_series is None:
757            new_dt_series = dt_val.apply(lambda x: parse(str(x)))
758
759        if coerce_utc:
760            return coerce_timezone(new_dt_series)
761
762        if changed_tz:
763            new_dt_series = new_dt_series.dt.tz_localize(original_tz)
764        return new_dt_series
765
766    try:
767        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
768        if new_dt_val.unit != precision_abbreviation:
769            new_dt_val = new_dt_val.as_unit(precision_abbreviation)
770        if as_pydatetime:
771            return new_dt_val.to_pydatetime()
772        return new_dt_val
773    except (pd.errors.OutOfBoundsDatetime, ValueError):
774        pass
775
776    new_dt_val = parse(dt_val)
777    if not coerce_utc:
778        return new_dt_val
779    return coerce_timezone(new_dt_val)

Wrap pd.to_datetime() and add support for out-of-bounds values.

Parameters
  • dt_val (Any): The value to coerce to Pandas Timestamps.
  • as_pydatetime (bool, default False): If True, return a Python datetime object.
  • coerce_utc (bool, default True): If True, ensure the value has UTC tzinfo.
  • precision_unit (Optional[str], default None): If provided, enforce the provided precision unit.
def serialize_bytes(data: bytes) -> str:
782def serialize_bytes(data: bytes) -> str:
783    """
784    Return the given bytes as a base64-encoded string.
785    """
786    import base64
787    if not isinstance(data, bytes) and value_is_null(data):
788        return data
789    return base64.b64encode(data).decode('utf-8')

Return the given bytes as a base64-encoded string.

def serialize_geometry( geom: Any, geometry_format: str = 'wkb_hex', srid: Optional[int] = None) -> Union[str, Dict[str, Any], bytes, NoneType]:
792def serialize_geometry(
793    geom: Any,
794    geometry_format: str = 'wkb_hex',
795    srid: Optional[int] = None,
796) -> Union[str, Dict[str, Any], bytes, None]:
797    """
798    Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON. 
799
800    Parameters
801    ----------
802    geom: Any
803        The potential geometry data to be serialized.
804
805    geometry_format: str, default 'wkb_hex'
806        The serialization format for geometry data.
807        Accepted formats are `wkb`, `wkb_hex`, `wkt`, `geojson`, and `gpkg_wkb`.
808
809    srid: Optional[int], default None
810        If provided, use this as the source CRS when serializing to GeoJSON.
811
812    Returns
813    -------
814    A string containing the geometry data, or bytes, or a dictionary, or None.
815    """
816    if value_is_null(geom):
817        return None
818
819    shapely, shapely_ops, pyproj, np = mrsm.attempt_import(
820        'shapely', 'shapely.ops', 'pyproj', 'numpy',
821        lazy=False,
822    )
823    if geometry_format == 'geojson':
824        if srid:
825            transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True)
826            geom = shapely_ops.transform(transformer.transform, geom)
827        geojson_str = shapely.to_geojson(geom)
828        return json.loads(geojson_str)
829
830    if not hasattr(geom, 'wkb_hex'):
831        return str(geom)
832
833    byte_order = 1 if np.little_endian else 0
834
835    if geometry_format.startswith("wkb"):
836        return shapely.to_wkb(geom, hex=(geometry_format=="wkb_hex"), include_srid=True)
837    elif geometry_format == 'gpkg_wkb':
838        wkb_data = shapely.to_wkb(geom, hex=False, byte_order=byte_order)
839        flags = (
840            ((byte_order & 0x01) | (0x20))
841            if geom.is_empty
842            else (byte_order & 0x01)
843        )
844        srid_val = srid or -1
845        header = struct.pack(
846            '<ccBBi',
847            b'G', b'P',
848            0,
849            flags,
850            srid_val
851        )
852        return header + wkb_data
853
854    return shapely.to_wkt(geom)

Serialize geometry data as WKB, WKB (hex), GPKG-WKB, WKT, or GeoJSON.

Parameters
  • geom (Any): The potential geometry data to be serialized.
  • geometry_format (str, default 'wkb_hex'): The serialization format for geometry data. Accepted formats are wkb, wkb_hex, wkt, geojson, and gpkg_wkb.
  • srid (Optional[int], default None): If provided, use this as the source CRS when serializing to GeoJSON.
Returns
  • A string containing the geometry data, or bytes, or a dictionary, or None.
def deserialize_geometry(geom_wkb: Union[str, bytes]):
857def deserialize_geometry(geom_wkb: Union[str, bytes]):
858    """
859    Deserialize a WKB string into a shapely geometry object.
860    """
861    shapely = mrsm.attempt_import('shapely', lazy=False)
862    return shapely.wkb.loads(geom_wkb)

Deserialize a WKB string into a shapely geometry object.

def project_geometry(geom, srid: int, to_srid: int = 4326):
865def project_geometry(geom, srid: int, to_srid: int = 4326):
866    """
867    Project a shapely geometry object to a new CRS (SRID).
868    """
869    pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False)
870    transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True)
871    return shapely_ops.transform(transformer.transform, geom)

Project a shapely geometry object to a new CRS (SRID).

def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Optional[bytes]:
874def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]:
875    """
876    Given a serialized ASCII string of bytes data, return the original bytes.
877    The input data may either be base64- or hex-encoded.
878
879    Parameters
880    ----------
881    data: Optional[str]
882        The string to be deserialized into bytes.
883        May be base64- or hex-encoded (prefixed with `'\\x'`).
884
885    force_hex: bool = False
886        If `True`, treat the input string as hex-encoded.
887        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
888        This will still strip the leading `'\\x'` prefix if present.
889
890    Returns
891    -------
892    The original bytes used to produce the encoded string `data`.
893    """
894    if not isinstance(data, str) and value_is_null(data):
895        return data
896
897    import binascii
898    import base64
899
900    is_hex = force_hex or data.startswith('\\x')
901
902    if is_hex:
903        if data.startswith('\\x'):
904            data = data[2:]
905        return binascii.unhexlify(data)
906
907    return base64.b64decode(data)

Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.

Parameters
  • data (Optional[str]): The string to be deserialized into bytes. May be base64- or hex-encoded (prefixed with '\x').
  • force_hex (bool = False): If True, treat the input string as hex-encoded. If data does not begin with the prefix '\x', set force_hex to True. This will still strip the leading '\x' prefix if present.
Returns
  • The original bytes used to produce the encoded string data.
def deserialize_base64(data: str) -> bytes:
910def deserialize_base64(data: str) -> bytes:
911    """
912    Return the original bytestring from the given base64-encoded string.
913    """
914    import base64
915    return base64.b64decode(data)

Return the original bytestring from the given base64-encoded string.

def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Optional[str]:
918def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]:
919    """
920    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
921    """
922    import binascii
923    if not isinstance(data, bytes) and value_is_null(data):
924        return data
925    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')

Return the given bytes as a hex string for PostgreSQL's BYTEA type.

def serialize_datetime(dt: datetime.datetime) -> Optional[str]:
928def serialize_datetime(dt: datetime) -> Union[str, None]:
929    """
930    Serialize a datetime object into JSON (ISO format string).
931
932    Examples
933    --------
934    >>> import json
935    >>> from datetime import datetime
936    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
937    '{"a": "2022-01-01T00:00:00Z"}'
938
939    """
940    if not hasattr(dt, 'isoformat'):
941        return None
942
943    tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else ''
944    return dt.isoformat() + tz_suffix

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
def serialize_date(d: datetime.date) -> Optional[str]:
947def serialize_date(d: date) -> Union[str, None]:
948    """
949    Serialize a date object into its ISO representation.
950    """
951    return d.isoformat() if hasattr(d, 'isoformat') else None

Serialize a date object into its ISO representation.

def json_serialize_value(x: Any, default_to_str: bool = True) -> Optional[str]:
954def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]:
955    """
956    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
957
958    Parameters
959    ----------
960    x: Any
961        The value to serialize.
962
963    default_to_str: bool, default True
964        If `True`, return a string of `x` if x is not a designated type.
965        Otherwise return x.
966
967    Returns
968    -------
969    A serialized version of x, or x.
970    """
971    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
972        return x.meta
973
974    if hasattr(x, 'tzinfo'):
975        return serialize_datetime(x)
976
977    if hasattr(x, 'isoformat'):
978        return serialize_date(x)
979
980    if isinstance(x, bytes):
981        return serialize_bytes(x)
982
983    if isinstance(x, Decimal):
984        return serialize_decimal(x)
985
986    if 'shapely' in str(type(x)):
987        return serialize_geometry(x)
988
989    if value_is_null(x):
990        return None
991
992    if isinstance(x, (dict, list, tuple)):
993        return json.dumps(x, default=json_serialize_value, separators=(',', ':'))
994
995    return str(x) if default_to_str else x

Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.

Parameters
  • x (Any): The value to serialize.
  • default_to_str (bool, default True): If True, return a string of x if x is not a designated type. Otherwise return x.
Returns
  • A serialized version of x, or x.
def get_geometry_type_srid( dtype: str = 'geometry', default_type: str = 'geometry', default_srid: int = 4326) -> Union[Tuple[str, int], Tuple[str, NoneType]]:
 998def get_geometry_type_srid(
 999    dtype: str = 'geometry',
1000    default_type: str = 'geometry',
1001    default_srid: int = 4326,
1002) -> Union[Tuple[str, int], Tuple[str, None]]:
1003    """
1004    Given the specified geometry `dtype`, return a tuple in the form (type, SRID).
1005
1006    Parameters
1007    ----------
1008    dtype: Optional[str], default None
1009        Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`).
1010        You may specify a supported `shapely` geometry type and an SRID in the dtype modifier:
1011
1012        - `Point`
1013        - `LineString`
1014        - `LinearRing`
1015        - `Polygon`
1016        - `MultiPoint`
1017        - `MultiLineString`
1018        - `MultiPolygon`
1019        - `GeometryCollection`
1020
1021    Returns
1022    -------
1023    A tuple in the form (type, SRID).
1024    Defaults to `(default_type, default_srid)`.
1025
1026    Examples
1027    --------
1028    >>> from meerschaum.utils.dtypes import get_geometry_type_srid
1029    >>> get_geometry_type_srid()
1030    ('geometry', 4326)
1031    >>> get_geometry_type_srid('geometry[]')
1032    ('geometry', 4326)
1033    >>> get_geometry_type_srid('geometry[Point, 0]')
1034    ('Point', 0)
1035    >>> get_geometry_type_srid('geometry[0, Point]')
1036    ('Point', 0)
1037    >>> get_geometry_type_srid('geometry[0]')
1038    ('geometry', 0)
1039    >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
1040    ('MultiLineString', 4326)
1041    >>> get_geometry_type_srid('geography')
1042    ('geometry', 4326)
1043    >>> get_geometry_type_srid('geography[POINT]')
1044    ('Point', 4376)
1045    """
1046    from meerschaum.utils.misc import is_int
1047    ### NOTE: PostGIS syntax must also be parsed.
1048    dtype = dtype.replace('(', '[').replace(')', ']')
1049    bare_dtype = dtype.split('[', maxsplit=1)[0]
1050    modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']')
1051    if not modifier:
1052        return default_type, default_srid
1053
1054    parts = [
1055        part.split('=')[-1].strip()
1056        for part in modifier.split(',')
1057    ]
1058    parts_casted = [
1059        (
1060            int(part)
1061            if is_int(part)
1062            else part
1063        )
1064        for part in parts
1065    ]
1066
1067    srid = default_srid
1068    geometry_type = default_type
1069
1070    for part in parts_casted:
1071        if isinstance(part, int):
1072            srid = part
1073            break
1074
1075    for part in parts_casted:
1076        if isinstance(part, str):
1077            geometry_type = part
1078            break
1079
1080    return geometry_type, srid

Given the specified geometry dtype, return a tuple in the form (type, SRID).

Parameters
  • dtype (Optional[str], default None): Optionally provide a specific geometry syntax (e.g. geometry[MultiLineString, 4326]). You may specify a supported shapely geometry type and an SRID in the dtype modifier:

    • Point
    • LineString
    • LinearRing
    • Polygon
    • MultiPoint
    • MultiLineString
    • MultiPolygon
    • GeometryCollection
Returns
  • A tuple in the form (type, SRID).
  • Defaults to (default_type, default_srid).
Examples
>>> from meerschaum.utils.dtypes import get_geometry_type_srid
>>> get_geometry_type_srid()
('geometry', 4326)
>>> get_geometry_type_srid('geometry[]')
('geometry', 4326)
>>> get_geometry_type_srid('geometry[Point, 0]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0, Point]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0]')
('geometry', 0)
>>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
('MultiLineString', 4326)
>>> get_geometry_type_srid('geography')
('geometry', 4326)
>>> get_geometry_type_srid('geography[POINT]')
('Point', 4376)
def get_current_timestamp( precision_unit: str = 'microsecond', precision_interval: int = 1, round_to: str = 'down', as_pandas: bool = False, as_int: bool = False, _now: Union[datetime.datetime, int, NoneType] = None) -> 'Union[datetime, pd.Timestamp, int]':
1083def get_current_timestamp(
1084    precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'],
1085    precision_interval: int = 1,
1086    round_to: str = 'down',
1087    as_pandas: bool = False,
1088    as_int: bool = False,
1089    _now: Union[datetime, int, None] = None,
1090) -> 'Union[datetime, pd.Timestamp, int]':
1091    """
1092    Return the current UTC timestamp to nanosecond precision.
1093
1094    Parameters
1095    ----------
1096    precision_unit: str, default 'us'
1097        The precision of the timestamp to be returned.
1098        Valid values are the following:
1099            - `ns` / `nanosecond`
1100            - `us` / `microsecond`
1101            - `ms` / `millisecond`
1102            - `s` / `sec` / `second`
1103            - `m` / `min` / `minute`
1104            - `h` / `hr` / `hour`
1105            - `d` / `day`
1106
1107    precision_interval: int, default 1
1108        Round the timestamp to the `precision_interval` units.
1109        For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals.
1110        Note: `precision_interval` must be 1 when `precision='nanosecond'`.
1111
1112    round_to: str, default 'down'
1113        The direction to which to round the timestamp.
1114        Available options are `down`, `up`, and `closest`.
1115
1116    as_pandas: bool, default False
1117        If `True`, return a Pandas Timestamp.
1118        This is always true if `unit` is `nanosecond`.
1119
1120    as_int: bool, default False
1121        If `True`, return the timestamp to an integer.
1122        Overrides `as_pandas`.
1123
1124    Returns
1125    -------
1126    A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
1127
1128    Examples
1129    --------
1130    >>> get_current_timestamp('ns')
1131    Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
1132    >>> get_current_timestamp('ms')
1133    Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1134    """
1135    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1136    if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS:
1137        from meerschaum.utils.misc import items_str
1138        raise ValueError(
1139            f"Unknown precision unit '{precision_unit}'. "
1140            "Accepted values are "
1141            f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}."
1142        )
1143
1144    if not as_int:
1145        as_pandas = as_pandas or true_precision_unit == 'nanosecond'
1146    pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None
1147
1148    if true_precision_unit == 'nanosecond':
1149        if precision_interval != 1:
1150            warn("`precision_interval` must be 1 for nanosecond precision.")
1151        now_ts = time.time_ns() if not isinstance(_now, int) else _now
1152        if as_int:
1153            return now_ts
1154        return pd.to_datetime(now_ts, unit='ns', utc=True)
1155
1156    now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now
1157    delta = timedelta(**{true_precision_unit + 's': precision_interval})
1158    rounded_now = round_time(now, delta, to=round_to)
1159
1160    if as_int:
1161        return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit])
1162
1163    ts_val = (
1164        pd.to_datetime(rounded_now, utc=True)
1165        if as_pandas
1166        else rounded_now
1167    )
1168
1169    if not as_pandas:
1170        return ts_val
1171
1172    as_unit_precisions = ('microsecond', 'millisecond', 'second')
1173    if true_precision_unit not in as_unit_precisions:
1174        return ts_val
1175
1176    return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])

Return the current UTC timestamp to nanosecond precision.

Parameters
  • precision_unit (str, default 'us'): The precision of the timestamp to be returned. Valid values are the following: - ns / nanosecond - us / microsecond - ms / millisecond - s / sec / second - m / min / minute - h / hr / hour - d / day
  • precision_interval (int, default 1): Round the timestamp to the precision_interval units. For example, precision='minute' and precision_interval=15 will round to 15-minute intervals. Note: precision_interval must be 1 when precision='nanosecond'.
  • round_to (str, default 'down'): The direction to which to round the timestamp. Available options are down, up, and closest.
  • as_pandas (bool, default False): If True, return a Pandas Timestamp. This is always true if unit is nanosecond.
  • as_int (bool, default False): If True, return the timestamp to an integer. Overrides as_pandas.
Returns
  • A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
Examples
>>> get_current_timestamp('ns')
Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
>>> get_current_timestamp('ms')
Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
def is_dtype_special(type_: str) -> bool:
1179def is_dtype_special(type_: str) -> bool:
1180    """
1181    Return whether a dtype should be treated as a special Meerschaum dtype.
1182    This is not the same as a Meerschaum alias.
1183    """
1184    true_type = MRSM_ALIAS_DTYPES.get(type_, type_)
1185    if true_type in (
1186        'uuid',
1187        'json',
1188        'bytes',
1189        'numeric',
1190        'datetime',
1191        'geometry',
1192        'geography',
1193        'date',
1194        'bool',
1195    ):
1196        return True
1197
1198    if are_dtypes_equal(true_type, 'datetime'):
1199        return True
1200
1201    if are_dtypes_equal(true_type, 'date'):
1202        return True
1203
1204    if true_type.startswith('numeric'):
1205        return True
1206
1207    if true_type.startswith('bool'):
1208        return True
1209
1210    if true_type.startswith('geometry'):
1211        return True
1212
1213    if true_type.startswith('geography'):
1214        return True
1215
1216    return False

Return whether a dtype should be treated as a special Meerschaum dtype. This is not the same as a Meerschaum alias.

def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1219def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1220    """
1221    Get the next precision string in order of value.
1222
1223    Parameters
1224    ----------
1225    precision_unit: str
1226        The precision string (`'nanosecond'`, `'ms'`, etc.).
1227
1228    decrease: bool, defaul True
1229        If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`).
1230        If `False`, return the precision unit which is higher.
1231
1232    Returns
1233    -------
1234    A `precision` string which is lower or higher than the given precision unit.
1235
1236    Examples
1237    --------
1238    >>> get_next_precision_unit('nanosecond')
1239    'microsecond'
1240    >>> get_next_precision_unit('ms')
1241    'second'
1242    >>> get_next_precision_unit('hour', decrease=False)
1243    'minute'
1244    """
1245    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1246    precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None)
1247    if not precision_scalar:
1248        raise ValueError(f"Invalid precision unit '{precision_unit}'.")
1249
1250    precisions = sorted(
1251        list(MRSM_PRECISION_UNITS_SCALARS),
1252        key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p]
1253    )
1254
1255    precision_index = precisions.index(true_precision_unit)
1256    new_precision_index = precision_index + (-1 if decrease else 1)
1257    if new_precision_index < 0 or new_precision_index >= len(precisions):
1258        raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.")
1259
1260    return precisions[new_precision_index]

Get the next precision string in order of value.

Parameters
  • precision_unit (str): The precision string ('nanosecond', 'ms', etc.).
  • decrease (bool, defaul True): If True return the precision unit which is lower (e.g. nanosecond -> millisecond). If False, return the precision unit which is higher.
Returns
  • A precision string which is lower or higher than the given precision unit.
Examples
>>> get_next_precision_unit('nanosecond')
'microsecond'
>>> get_next_precision_unit('ms')
'second'
>>> get_next_precision_unit('hour', decrease=False)
'minute'
def round_time( dt: Optional[datetime.datetime] = None, date_delta: Optional[datetime.timedelta] = None, to: str = 'down') -> datetime.datetime:
1263def round_time(
1264    dt: Optional[datetime] = None,
1265    date_delta: Optional[timedelta] = None,
1266    to: 'str' = 'down'
1267) -> datetime:
1268    """
1269    Round a datetime object to a multiple of a timedelta.
1270
1271    Parameters
1272    ----------
1273    dt: Optional[datetime], default None
1274        If `None`, grab the current UTC datetime.
1275
1276    date_delta: Optional[timedelta], default None
1277        If `None`, use a delta of 1 minute.
1278
1279    to: 'str', default 'down'
1280        Available options are `'up'`, `'down'`, and `'closest'`.
1281
1282    Returns
1283    -------
1284    A rounded `datetime` object.
1285
1286    Examples
1287    --------
1288    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
1289    datetime.datetime(2022, 1, 1, 12, 15)
1290    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
1291    datetime.datetime(2022, 1, 1, 12, 16)
1292    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
1293    datetime.datetime(2022, 1, 1, 12, 0)
1294    >>> round_time(
1295    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
1296    ...   timedelta(hours=1),
1297    ...   to = 'closest'
1298    ... )
1299    datetime.datetime(2022, 1, 1, 12, 0)
1300    >>> round_time(
1301    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
1302    ...   datetime.timedelta(hours=1),
1303    ...   to = 'closest'
1304    ... )
1305    datetime.datetime(2022, 1, 1, 13, 0)
1306
1307    """
1308    from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP
1309    if date_delta is None:
1310        date_delta = timedelta(minutes=1)
1311
1312    if dt is None:
1313        dt = datetime.now(timezone.utc).replace(tzinfo=None)
1314
1315    def get_total_microseconds(td: timedelta) -> int:
1316        return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds
1317
1318    round_to_microseconds = get_total_microseconds(date_delta)
1319    if round_to_microseconds == 0:
1320        return dt
1321
1322    dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min
1323    dt_total_microseconds = get_total_microseconds(dt_delta_from_min)
1324
1325    dt_dec = Decimal(dt_total_microseconds)
1326    round_to_dec = Decimal(round_to_microseconds)
1327
1328    div = dt_dec / round_to_dec
1329    if to == 'down':
1330        num_intervals = div.to_integral_value(rounding=ROUND_DOWN)
1331    elif to == 'up':
1332        num_intervals = div.to_integral_value(rounding=ROUND_UP)
1333    else:
1334        num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP)
1335
1336    rounded_dt_total_microseconds = num_intervals * round_to_dec
1337    adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds
1338
1339    return dt + timedelta(microseconds=adjustment_microseconds)

Round a datetime object to a multiple of a timedelta.

Parameters
  • dt (Optional[datetime], default None): If None, grab the current UTC datetime.
  • date_delta (Optional[timedelta], default None): If None, use a delta of 1 minute.
  • to ('str', default 'down'): Available options are 'up', 'down', and 'closest'.
Returns
  • A rounded datetime object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 15, 57, 200),
...   timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 45, 57, 200),
...   datetime.timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)