meerschaum.utils.dtypes

Utility functions for working with data types.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with data types.
   7"""
   8
   9import traceback
  10import json
  11import uuid
  12import time
  13from datetime import timezone, datetime, date, timedelta
  14from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP
  15
  16import meerschaum as mrsm
  17from meerschaum.utils.typing import Dict, Union, Any, Optional, Tuple
  18from meerschaum.utils.warnings import warn
  19from meerschaum._internal.static import STATIC_CONFIG as _STATIC_CONFIG
  20
  21MRSM_ALIAS_DTYPES: Dict[str, str] = {
  22    'decimal': 'numeric',
  23    'Decimal': 'numeric',
  24    'number': 'numeric',
  25    'jsonl': 'json',
  26    'JSON': 'json',
  27    'binary': 'bytes',
  28    'blob': 'bytes',
  29    'varbinary': 'bytes',
  30    'bytea': 'bytes',
  31    'guid': 'uuid',
  32    'UUID': 'uuid',
  33    'geom': 'geometry',
  34    'geog': 'geography',
  35    'boolean': 'bool',
  36    'day': 'date',
  37}
  38MRSM_PD_DTYPES: Dict[Union[str, None], str] = {
  39    'json': 'object',
  40    'numeric': 'object',
  41    'geometry': 'object',
  42    'geography': 'object',
  43    'uuid': 'object',
  44    'date': 'date32[day][pyarrow]',
  45    'datetime': 'datetime64[us, UTC]',
  46    'bool': 'bool[pyarrow]',
  47    'int': 'int64[pyarrow]',
  48    'int8': 'int8[pyarrow]',
  49    'int16': 'int16[pyarrow]',
  50    'int32': 'int32[pyarrow]',
  51    'int64': 'int64[pyarrow]',
  52    'str': 'string',
  53    'bytes': 'binary[pyarrow]',
  54    None: 'object',
  55}
  56
  57MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = {
  58    'nanosecond': 1_000_000_000,
  59    'microsecond': 1_000_000,
  60    'millisecond': 1000,
  61    'second': 1,
  62    'minute': (1 / 60),
  63    'hour': (1 / 3600),
  64    'day': (1 / 86400),
  65}
  66
  67MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = {
  68    'ns': 'nanosecond',
  69    'us': 'microsecond',
  70    'ms': 'millisecond',
  71    's': 'second',
  72    'sec': 'second',
  73    'm': 'minute',
  74    'min': 'minute',
  75    'h': 'hour',
  76    'hr': 'hour',
  77    'd': 'day',
  78    'D': 'day',
  79}
  80MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = {
  81    'nanosecond': 'ns',
  82    'microsecond': 'us',
  83    'millisecond': 'ms',
  84    'second': 's',
  85    'minute': 'min',
  86    'hour': 'hr',
  87    'day': 'D',
  88}
  89
  90
  91def to_pandas_dtype(dtype: str) -> str:
  92    """
  93    Cast a supported Meerschaum dtype to a Pandas dtype.
  94    """
  95    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
  96    if known_dtype is not None:
  97        return known_dtype
  98
  99    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
 100    if alias_dtype is not None:
 101        return MRSM_PD_DTYPES[alias_dtype]
 102
 103    if dtype.startswith('numeric'):
 104        return MRSM_PD_DTYPES['numeric']
 105
 106    if dtype.startswith('geometry'):
 107        return MRSM_PD_DTYPES['geometry']
 108
 109    if dtype.startswith('geography'):
 110        return MRSM_PD_DTYPES['geography']
 111
 112    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
 113    ### treat it as a SQL db type.
 114    if dtype.split(' ')[0].isupper():
 115        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
 116        return get_pd_type_from_db_type(dtype)
 117
 118    from meerschaum.utils.packages import attempt_import
 119    _ = attempt_import('pyarrow', lazy=False)
 120    pandas = attempt_import('pandas', lazy=False)
 121
 122    try:
 123        return str(pandas.api.types.pandas_dtype(dtype))
 124    except Exception:
 125        warn(
 126            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
 127            + f"{traceback.format_exc()}",
 128            stack=False,
 129        )
 130    return 'object'
 131
 132
 133def are_dtypes_equal(
 134    ldtype: Union[str, Dict[str, str]],
 135    rdtype: Union[str, Dict[str, str]],
 136) -> bool:
 137    """
 138    Determine whether two dtype strings may be considered
 139    equivalent to avoid unnecessary conversions.
 140
 141    Parameters
 142    ----------
 143    ldtype: Union[str, Dict[str, str]]
 144        The left dtype to compare.
 145        May also provide a dtypes dictionary.
 146
 147    rdtype: Union[str, Dict[str, str]]
 148        The right dtype to compare.
 149        May also provide a dtypes dictionary.
 150
 151    Returns
 152    -------
 153    A `bool` indicating whether the two dtypes are to be considered equivalent.
 154    """
 155    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
 156        lkeys = sorted([str(k) for k in ldtype.keys()])
 157        rkeys = sorted([str(k) for k in rdtype.keys()])
 158        for lkey, rkey in zip(lkeys, rkeys):
 159            if lkey != rkey:
 160                return False
 161            ltype = ldtype[lkey]
 162            rtype = rdtype[rkey]
 163            if not are_dtypes_equal(ltype, rtype):
 164                return False
 165        return True
 166
 167    try:
 168        if ldtype == rdtype:
 169            return True
 170    except Exception:
 171        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
 172        return False
 173
 174    ### Sometimes pandas dtype objects are passed.
 175    ldtype = str(ldtype).split('[', maxsplit=1)[0]
 176    rdtype = str(rdtype).split('[', maxsplit=1)[0]
 177
 178    if ldtype in MRSM_ALIAS_DTYPES:
 179        ldtype = MRSM_ALIAS_DTYPES[ldtype]
 180
 181    if rdtype in MRSM_ALIAS_DTYPES:
 182        rdtype = MRSM_ALIAS_DTYPES[rdtype]
 183
 184    json_dtypes = ('json', 'object')
 185    if ldtype in json_dtypes and rdtype in json_dtypes:
 186        return True
 187
 188    numeric_dtypes = ('numeric', 'decimal', 'object')
 189    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
 190        return True
 191
 192    uuid_dtypes = ('uuid', 'object')
 193    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
 194        return True
 195
 196    bytes_dtypes = ('bytes', 'object', 'binary')
 197    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
 198        return True
 199
 200    geometry_dtypes = ('geometry', 'object', 'geography')
 201    if ldtype in geometry_dtypes and rdtype in geometry_dtypes:
 202        return True
 203
 204    if ldtype.lower() == rdtype.lower():
 205        return True
 206
 207    datetime_dtypes = ('datetime', 'timestamp')
 208    ldtype_found_dt_prefix = False
 209    rdtype_found_dt_prefix = False
 210    for dt_prefix in datetime_dtypes:
 211        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
 212        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
 213    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
 214        return True
 215
 216    string_dtypes = ('str', 'string', 'object')
 217    if ldtype in string_dtypes and rdtype in string_dtypes:
 218        return True
 219
 220    int_dtypes = (
 221        'int', 'int64', 'int32', 'int16', 'int8',
 222        'uint', 'uint64', 'uint32', 'uint16', 'uint8',
 223    )
 224    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
 225        return True
 226
 227    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
 228    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
 229        return True
 230
 231    bool_dtypes = ('bool', 'boolean')
 232    if ldtype in bool_dtypes and rdtype in bool_dtypes:
 233        return True
 234
 235    date_dtypes = (
 236        'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]',
 237        'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]',
 238    )
 239    if ldtype in date_dtypes and rdtype in date_dtypes:
 240        return True
 241
 242    return False
 243
 244
 245def is_dtype_numeric(dtype: str) -> bool:
 246    """
 247    Determine whether a given `dtype` string
 248    should be considered compatible with the Meerschaum dtype `numeric`.
 249
 250    Parameters
 251    ----------
 252    dtype: str
 253        The pandas-like dtype string.
 254
 255    Returns
 256    -------
 257    A bool indicating the dtype is compatible with `numeric`.
 258    """
 259    dtype_lower = dtype.lower()
 260
 261    acceptable_substrings = ('numeric', 'float', 'double', 'int')
 262    for substring in acceptable_substrings:
 263        if substring in dtype_lower:
 264            return True
 265
 266    return False
 267
 268
 269def attempt_cast_to_numeric(
 270    value: Any,
 271    quantize: bool = False,
 272    precision: Optional[int] = None,
 273    scale: Optional[int] = None,
 274)-> Any:
 275    """
 276    Given a value, attempt to coerce it into a numeric (Decimal).
 277
 278    Parameters
 279    ----------
 280    value: Any
 281        The value to be cast to a Decimal.
 282
 283    quantize: bool, default False
 284        If `True`, quantize the decimal to the specified precision and scale.
 285
 286    precision: Optional[int], default None
 287        If `quantize` is `True`, use this precision.
 288
 289    scale: Optional[int], default None
 290        If `quantize` is `True`, use this scale.
 291
 292    Returns
 293    -------
 294    A `Decimal` if possible, or `value`.
 295    """
 296    if isinstance(value, Decimal):
 297        if quantize and precision and scale:
 298            return quantize_decimal(value, precision, scale)
 299        return value
 300    try:
 301        if value_is_null(value):
 302            return Decimal('NaN')
 303
 304        dec = Decimal(str(value))
 305        if not quantize or not precision or not scale:
 306            return dec
 307        return quantize_decimal(dec, precision, scale)
 308    except Exception:
 309        return value
 310
 311
 312def attempt_cast_to_uuid(value: Any) -> Any:
 313    """
 314    Given a value, attempt to coerce it into a UUID (`uuid4`).
 315    """
 316    if isinstance(value, uuid.UUID):
 317        return value
 318    try:
 319        return (
 320            uuid.UUID(str(value))
 321            if not value_is_null(value)
 322            else None
 323        )
 324    except Exception:
 325        return value
 326
 327
 328def attempt_cast_to_bytes(value: Any) -> Any:
 329    """
 330    Given a value, attempt to coerce it into a bytestring.
 331    """
 332    if isinstance(value, bytes):
 333        return value
 334    try:
 335        return (
 336            deserialize_bytes_string(str(value))
 337            if not value_is_null(value)
 338            else None
 339        )
 340    except Exception:
 341        return value
 342
 343
 344def attempt_cast_to_geometry(value: Any) -> Any:
 345    """
 346    Given a value, attempt to coerce it into a `shapely` (`geometry`) object.
 347    """
 348    shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import(
 349        'shapely',
 350        'shapely.wkt',
 351        'shapely.wkb',
 352        lazy=False,
 353    )
 354    if 'shapely' in str(type(value)):
 355        return value
 356
 357    if isinstance(value, (dict, list)):
 358        try:
 359            return shapely.from_geojson(json.dumps(value))
 360        except Exception:
 361            return value
 362
 363    value_is_wkt = geometry_is_wkt(value)
 364    if value_is_wkt is None:
 365        return value
 366
 367    try:
 368        return (
 369            shapely_wkt.loads(value)
 370            if value_is_wkt
 371            else shapely_wkb.loads(value)
 372        )
 373    except Exception:
 374        return value
 375
 376
 377def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]:
 378    """
 379    Determine whether an input value should be treated as WKT or WKB geometry data.
 380
 381    Parameters
 382    ----------
 383    value: Union[str, bytes]
 384        The input data to be parsed into geometry data.
 385
 386    Returns
 387    -------
 388    A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB).
 389    Return `None` if `value` should be parsed as neither.
 390    """
 391    import re
 392    if not isinstance(value, (str, bytes)):
 393        return None
 394
 395    if isinstance(value, bytes):
 396        return False
 397    
 398    wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$'
 399    if re.match(wkt_pattern, value, re.IGNORECASE):
 400        return True
 401    
 402    if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0:
 403        return False
 404    
 405    return None
 406
 407
 408def value_is_null(value: Any) -> bool:
 409    """
 410    Determine if a value is a null-like string.
 411    """
 412    return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')
 413
 414
 415def none_if_null(value: Any) -> Any:
 416    """
 417    Return `None` if a value is a null-like string.
 418    """
 419    return (None if value_is_null(value) else value)
 420
 421
 422def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
 423    """
 424    Quantize a given `Decimal` to a known scale and precision.
 425
 426    Parameters
 427    ----------
 428    x: Decimal
 429        The `Decimal` to be quantized.
 430
 431    precision: int
 432        The total number of significant digits.
 433
 434    scale: int
 435        The number of significant digits after the decimal point.
 436
 437    Returns
 438    -------
 439    A `Decimal` quantized to the specified scale and precision.
 440    """
 441    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
 442    try:
 443        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
 444    except InvalidOperation:
 445        pass
 446
 447    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
 448
 449
 450def serialize_decimal(
 451    x: Any,
 452    quantize: bool = False,
 453    precision: Optional[int] = None,
 454    scale: Optional[int] = None,
 455) -> Any:
 456    """
 457    Return a quantized string of an input decimal.
 458
 459    Parameters
 460    ----------
 461    x: Any
 462        The potential decimal to be serialized.
 463
 464    quantize: bool, default False
 465        If `True`, quantize the incoming Decimal to the specified scale and precision
 466        before serialization.
 467
 468    precision: Optional[int], default None
 469        The precision of the decimal to be quantized.
 470
 471    scale: Optional[int], default None
 472        The scale of the decimal to be quantized.
 473
 474    Returns
 475    -------
 476    A string of the input decimal or the input if not a Decimal.
 477    """
 478    if not isinstance(x, Decimal):
 479        return x
 480
 481    if value_is_null(x):
 482        return None
 483
 484    if quantize and scale and precision:
 485        x = quantize_decimal(x, precision, scale)
 486
 487    return f"{x:f}"
 488
 489
 490def coerce_timezone(
 491    dt: Any,
 492    strip_utc: bool = False,
 493) -> Any:
 494    """
 495    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
 496    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
 497    """
 498    if dt is None:
 499        return None
 500
 501    if isinstance(dt, int):
 502        return dt
 503
 504    if isinstance(dt, str):
 505        dateutil_parser = mrsm.attempt_import('dateutil.parser')
 506        try:
 507            dt = dateutil_parser.parse(dt)
 508        except Exception:
 509            return dt
 510
 511    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
 512    if dt_is_series:
 513        pandas = mrsm.attempt_import('pandas', lazy=False)
 514
 515        if (
 516            pandas.api.types.is_datetime64_any_dtype(dt) and (
 517                (dt.dt.tz is not None and not strip_utc)
 518                or
 519                (dt.dt.tz is None and strip_utc)
 520            )
 521        ):
 522            return dt
 523
 524        dt_series = to_datetime(dt, coerce_utc=False)
 525        if dt_series.dt.tz is None:
 526            dt_series = dt_series.dt.tz_localize(timezone.utc)
 527        if strip_utc:
 528            try:
 529                if dt_series.dt.tz is not None:
 530                    dt_series = dt_series.dt.tz_localize(None)
 531            except Exception:
 532                pass
 533
 534        return dt_series
 535
 536    if dt.tzinfo is None:
 537        if strip_utc:
 538            return dt
 539        return dt.replace(tzinfo=timezone.utc)
 540
 541    utc_dt = dt.astimezone(timezone.utc)
 542    if strip_utc:
 543        return utc_dt.replace(tzinfo=None)
 544    return utc_dt
 545
 546
 547def to_datetime(
 548    dt_val: Any,
 549    as_pydatetime: bool = False,
 550    coerce_utc: bool = True,
 551    precision_unit: Optional[str] = None,
 552) -> Any:
 553    """
 554    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
 555
 556    Parameters
 557    ----------
 558    dt_val: Any
 559        The value to coerce to Pandas Timestamps.
 560
 561    as_pydatetime: bool, default False
 562        If `True`, return a Python datetime object.
 563
 564    coerce_utc: bool, default True
 565        If `True`, ensure the value has UTC tzinfo.
 566
 567    precision_unit: Optional[str], default None
 568        If provided, enforce the provided precision unit.
 569    """
 570    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
 571    is_dask = 'dask' in getattr(dt_val, '__module__', '')
 572    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
 573    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
 574    pd = pandas if dd is None else dd
 575    enforce_precision = precision_unit is not None
 576    precision_unit = precision_unit or 'microsecond'
 577    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
 578    precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None)
 579    if not precision_abbreviation:
 580        raise ValueError(f"Invalid precision '{precision_unit}'.")
 581
 582    def parse(x: Any) -> Any:
 583        try:
 584            return dateutil_parser.parse(x)
 585        except Exception:
 586            return x
 587
 588    def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool:
 589        dtype_check_against = (
 590            f"datetime64[{precision_abbreviation}, UTC]"
 591            if with_utc
 592            else f"datetime64[{precision_abbreviation}]"
 593        )
 594        return (
 595            dtype_to_check == dtype_check_against
 596            if enforce_precision
 597            else (
 598                dtype_to_check.startswith('datetime64[')
 599                and (
 600                    ('utc' in dtype_to_check.lower())
 601                    if with_utc
 602                    else ('utc' not in dtype_to_check.lower())
 603                )
 604            )
 605        )
 606
 607    if isinstance(dt_val, pd.Timestamp):
 608        dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime()
 609        return (
 610            coerce_timezone(dt_val_to_return)
 611            if coerce_utc
 612            else dt_val_to_return
 613        )
 614
 615    if dt_is_series:
 616        changed_tz = False
 617        original_tz = None
 618        dtype = str(getattr(dt_val, 'dtype', 'object'))
 619        if (
 620            are_dtypes_equal(dtype, 'datetime')
 621            and 'utc' not in dtype.lower()
 622            and hasattr(dt_val, 'dt')
 623        ):
 624            original_tz = dt_val.dt.tz
 625            dt_val = dt_val.dt.tz_localize(timezone.utc)
 626            changed_tz = True
 627            dtype = str(getattr(dt_val, 'dtype', 'object'))
 628        try:
 629            new_dt_series = (
 630                dt_val
 631                if check_dtype(dtype, with_utc=True)
 632                else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]")
 633            )
 634        except pd.errors.OutOfBoundsDatetime:
 635            try:
 636                next_precision = get_next_precision_unit(true_precision_unit)
 637                next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision]
 638                new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]")
 639            except Exception:
 640                new_dt_series = None
 641        except ValueError:
 642            new_dt_series = None
 643        except TypeError:
 644            try:
 645                new_dt_series = (
 646                    new_dt_series
 647                    if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False)
 648                    else dt_val.astype(f"datetime64[{precision_abbreviation}]")
 649                )
 650            except Exception:
 651                new_dt_series = None
 652
 653        if new_dt_series is None:
 654            new_dt_series = dt_val.apply(lambda x: parse(str(x)))
 655
 656        if coerce_utc:
 657            return coerce_timezone(new_dt_series)
 658
 659        if changed_tz:
 660            new_dt_series = new_dt_series.dt.tz_localize(original_tz)
 661        return new_dt_series
 662
 663    try:
 664        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
 665        if new_dt_val.unit != precision_abbreviation:
 666            new_dt_val = new_dt_val.as_unit(precision_abbreviation)
 667        if as_pydatetime:
 668            return new_dt_val.to_pydatetime()
 669        return new_dt_val
 670    except (pd.errors.OutOfBoundsDatetime, ValueError):
 671        pass
 672
 673    new_dt_val = parse(dt_val)
 674    if not coerce_utc:
 675        return new_dt_val
 676    return coerce_timezone(new_dt_val)
 677
 678
 679def serialize_bytes(data: bytes) -> str:
 680    """
 681    Return the given bytes as a base64-encoded string.
 682    """
 683    import base64
 684    if not isinstance(data, bytes) and value_is_null(data):
 685        return data
 686    return base64.b64encode(data).decode('utf-8')
 687
 688
 689def serialize_geometry(
 690    geom: Any,
 691    geometry_format: str = 'wkb_hex',
 692    srid: Optional[int] = None,
 693) -> Union[str, Dict[str, Any], None]:
 694    """
 695    Serialize geometry data as a hex-encoded well-known-binary string. 
 696
 697    Parameters
 698    ----------
 699    geom: Any
 700        The potential geometry data to be serialized.
 701
 702    geometry_format: str, default 'wkb_hex'
 703        The serialization format for geometry data.
 704        Accepted formats are `wkb_hex` (well-known binary hex string),
 705        `wkt` (well-known text), and `geojson`.
 706
 707    srid: Optional[int], default None
 708        If provided, use this as the source CRS when serializing to GeoJSON.
 709
 710    Returns
 711    -------
 712    A string containing the geometry data.
 713    """
 714    if value_is_null(geom):
 715        return None
 716    shapely, shapely_ops, pyproj = mrsm.attempt_import(
 717        'shapely', 'shapely.ops', 'pyproj',
 718        lazy=False,
 719    )
 720    if geometry_format == 'geojson':
 721        if srid:
 722            transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True)
 723            geom = shapely_ops.transform(transformer.transform, geom)
 724        geojson_str = shapely.to_geojson(geom)
 725        return json.loads(geojson_str)
 726
 727    if hasattr(geom, 'wkb_hex'):
 728        if geometry_format == "wkb_hex":
 729            return shapely.to_wkb(geom, hex=True, include_srid=True)
 730        return shapely.to_wkt(geom)
 731
 732    return str(geom)
 733
 734
 735def deserialize_geometry(geom_wkb: Union[str, bytes]):
 736    """
 737    Deserialize a WKB string into a shapely geometry object.
 738    """
 739    shapely = mrsm.attempt_import('shapely', lazy=False)
 740    return shapely.wkb.loads(geom_wkb)
 741
 742
 743def project_geometry(geom, srid: int, to_srid: int = 4326):
 744    """
 745    Project a shapely geometry object to a new CRS (SRID).
 746    """
 747    pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False)
 748    transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True)
 749    return shapely_ops.transform(transformer.transform, geom)
 750
 751
 752def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]:
 753    """
 754    Given a serialized ASCII string of bytes data, return the original bytes.
 755    The input data may either be base64- or hex-encoded.
 756
 757    Parameters
 758    ----------
 759    data: Optional[str]
 760        The string to be deserialized into bytes.
 761        May be base64- or hex-encoded (prefixed with `'\\x'`).
 762
 763    force_hex: bool = False
 764        If `True`, treat the input string as hex-encoded.
 765        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
 766        This will still strip the leading `'\\x'` prefix if present.
 767
 768    Returns
 769    -------
 770    The original bytes used to produce the encoded string `data`.
 771    """
 772    if not isinstance(data, str) and value_is_null(data):
 773        return data
 774
 775    import binascii
 776    import base64
 777
 778    is_hex = force_hex or data.startswith('\\x')
 779
 780    if is_hex:
 781        if data.startswith('\\x'):
 782            data = data[2:]
 783        return binascii.unhexlify(data)
 784
 785    return base64.b64decode(data)
 786
 787
 788def deserialize_base64(data: str) -> bytes:
 789    """
 790    Return the original bytestring from the given base64-encoded string.
 791    """
 792    import base64
 793    return base64.b64decode(data)
 794
 795
 796def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]:
 797    """
 798    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
 799    """
 800    import binascii
 801    if not isinstance(data, bytes) and value_is_null(data):
 802        return data
 803    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
 804
 805
 806def serialize_datetime(dt: datetime) -> Union[str, None]:
 807    """
 808    Serialize a datetime object into JSON (ISO format string).
 809
 810    Examples
 811    --------
 812    >>> import json
 813    >>> from datetime import datetime
 814    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
 815    '{"a": "2022-01-01T00:00:00Z"}'
 816
 817    """
 818    if not hasattr(dt, 'isoformat'):
 819        return None
 820
 821    tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else ''
 822    return dt.isoformat() + tz_suffix
 823
 824
 825def serialize_date(d: date) -> Union[str, None]:
 826    """
 827    Serialize a date object into its ISO representation.
 828    """
 829    return d.isoformat() if hasattr(d, 'isoformat') else None
 830
 831
 832def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]:
 833    """
 834    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
 835
 836    Parameters
 837    ----------
 838    x: Any
 839        The value to serialize.
 840
 841    default_to_str: bool, default True
 842        If `True`, return a string of `x` if x is not a designated type.
 843        Otherwise return x.
 844
 845    Returns
 846    -------
 847    A serialized version of x, or x.
 848    """
 849    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
 850        return x.meta
 851
 852    if hasattr(x, 'tzinfo'):
 853        return serialize_datetime(x)
 854
 855    if hasattr(x, 'isoformat'):
 856        return serialize_date(x)
 857
 858    if isinstance(x, bytes):
 859        return serialize_bytes(x)
 860
 861    if isinstance(x, Decimal):
 862        return serialize_decimal(x)
 863
 864    if 'shapely' in str(type(x)):
 865        return serialize_geometry(x)
 866
 867    if value_is_null(x):
 868        return None
 869
 870    if isinstance(x, (dict, list, tuple)):
 871        return json.dumps(x, default=json_serialize_value, separators=(',', ':'))
 872
 873    return str(x) if default_to_str else x
 874
 875
 876def get_geometry_type_srid(
 877    dtype: str = 'geometry',
 878    default_type: str = 'geometry',
 879    default_srid: int = 4326,
 880) -> Union[Tuple[str, int], Tuple[str, None]]:
 881    """
 882    Given the specified geometry `dtype`, return a tuple in the form (type, SRID).
 883
 884    Parameters
 885    ----------
 886    dtype: Optional[str], default None
 887        Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`).
 888        You may specify a supported `shapely` geometry type and an SRID in the dtype modifier:
 889
 890        - `Point`
 891        - `LineString`
 892        - `LinearRing`
 893        - `Polygon`
 894        - `MultiPoint`
 895        - `MultiLineString`
 896        - `MultiPolygon`
 897        - `GeometryCollection`
 898
 899    Returns
 900    -------
 901    A tuple in the form (type, SRID).
 902    Defaults to `(default_type, default_srid)`.
 903
 904    Examples
 905    --------
 906    >>> from meerschaum.utils.dtypes import get_geometry_type_srid
 907    >>> get_geometry_type_srid()
 908    ('geometry', 4326)
 909    >>> get_geometry_type_srid('geometry[]')
 910    ('geometry', 4326)
 911    >>> get_geometry_type_srid('geometry[Point, 0]')
 912    ('Point', 0)
 913    >>> get_geometry_type_srid('geometry[0, Point]')
 914    ('Point', 0)
 915    >>> get_geometry_type_srid('geometry[0]')
 916    ('geometry', 0)
 917    >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
 918    ('MultiLineString', 4326)
 919    >>> get_geometry_type_srid('geography')
 920    ('geometry', 4326)
 921    >>> get_geometry_type_srid('geography[POINT]')
 922    ('Point', 4376)
 923    """
 924    from meerschaum.utils.misc import is_int
 925    ### NOTE: PostGIS syntax must also be parsed.
 926    dtype = dtype.replace('(', '[').replace(')', ']')
 927    bare_dtype = dtype.split('[', maxsplit=1)[0]
 928    modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']')
 929    if not modifier:
 930        return default_type, default_srid
 931
 932    parts = [
 933        part.split('=')[-1].strip()
 934        for part in modifier.split(',')
 935    ]
 936    parts_casted = [
 937        (
 938            int(part)
 939            if is_int(part)
 940            else part
 941        )
 942        for part in parts
 943    ]
 944
 945    srid = default_srid
 946    geometry_type = default_type
 947
 948    for part in parts_casted:
 949        if isinstance(part, int):
 950            srid = part
 951            break
 952
 953    for part in parts_casted:
 954        if isinstance(part, str):
 955            geometry_type = part
 956            break
 957
 958    return geometry_type, srid
 959
 960
 961def get_current_timestamp(
 962    precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'],
 963    precision_interval: int = 1,
 964    round_to: str = 'down',
 965    as_pandas: bool = False,
 966    as_int: bool = False,
 967    _now: Union[datetime, int, None] = None,
 968) -> 'Union[datetime, pd.Timestamp, int]':
 969    """
 970    Return the current UTC timestamp to nanosecond precision.
 971
 972    Parameters
 973    ----------
 974    precision_unit: str, default 'us'
 975        The precision of the timestamp to be returned.
 976        Valid values are the following:
 977            - `ns` / `nanosecond`
 978            - `us` / `microsecond`
 979            - `ms` / `millisecond`
 980            - `s` / `sec` / `second`
 981            - `m` / `min` / `minute`
 982            - `h` / `hr` / `hour`
 983            - `d` / `day`
 984
 985    precision_interval: int, default 1
 986        Round the timestamp to the `precision_interval` units.
 987        For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals.
 988        Note: `precision_interval` must be 1 when `precision='nanosecond'`.
 989
 990    round_to: str, default 'down'
 991        The direction to which to round the timestamp.
 992        Available options are `down`, `up`, and `closest`.
 993
 994    as_pandas: bool, default False
 995        If `True`, return a Pandas Timestamp.
 996        This is always true if `unit` is `nanosecond`.
 997
 998    as_int: bool, default False
 999        If `True`, return the timestamp to an integer.
1000        Overrides `as_pandas`.
1001
1002    Returns
1003    -------
1004    A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
1005
1006    Examples
1007    --------
1008    >>> get_current_timestamp('ns')
1009    Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
1010    >>> get_current_timestamp('ms')
1011    Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1012    """
1013    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1014    if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS:
1015        from meerschaum.utils.misc import items_str
1016        raise ValueError(
1017            f"Unknown precision unit '{precision_unit}'. "
1018            "Accepted values are "
1019            f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}."
1020        )
1021
1022    if not as_int:
1023        as_pandas = as_pandas or true_precision_unit == 'nanosecond'
1024    pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None
1025
1026    if true_precision_unit == 'nanosecond':
1027        if precision_interval != 1:
1028            warn("`precision_interval` must be 1 for nanosecond precision.")
1029        now_ts = time.time_ns() if not isinstance(_now, int) else _now
1030        if as_int:
1031            return now_ts
1032        return pd.to_datetime(now_ts, unit='ns', utc=True)
1033
1034    now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now
1035    delta = timedelta(**{true_precision_unit + 's': precision_interval})
1036    rounded_now = round_time(now, delta, to=round_to)
1037
1038    if as_int:
1039        return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit])
1040
1041    ts_val = (
1042        pd.to_datetime(rounded_now, utc=True)
1043        if as_pandas
1044        else rounded_now
1045    )
1046
1047    if not as_pandas:
1048        return ts_val
1049
1050    as_unit_precisions = ('microsecond', 'millisecond', 'second')
1051    if true_precision_unit not in as_unit_precisions:
1052        return ts_val
1053
1054    return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])
1055
1056
1057def is_dtype_special(type_: str) -> bool:
1058    """
1059    Return whether a dtype should be treated as a special Meerschaum dtype.
1060    This is not the same as a Meerschaum alias.
1061    """
1062    true_type = MRSM_ALIAS_DTYPES.get(type_, type_)
1063    if true_type in (
1064        'uuid',
1065        'json',
1066        'bytes',
1067        'numeric',
1068        'datetime',
1069        'geometry',
1070        'geography',
1071        'date',
1072        'bool',
1073    ):
1074        return True
1075
1076    if are_dtypes_equal(true_type, 'datetime'):
1077        return True
1078
1079    if are_dtypes_equal(true_type, 'date'):
1080        return True
1081
1082    if true_type.startswith('numeric'):
1083        return True
1084
1085    if true_type.startswith('bool'):
1086        return True
1087
1088    if true_type.startswith('geometry'):
1089        return True
1090
1091    if true_type.startswith('geography'):
1092        return True
1093
1094    return False
1095
1096
1097def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1098    """
1099    Get the next precision string in order of value.
1100
1101    Parameters
1102    ----------
1103    precision_unit: str
1104        The precision string (`'nanosecond'`, `'ms'`, etc.).
1105
1106    decrease: bool, defaul True
1107        If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`).
1108        If `False`, return the precision unit which is higher.
1109
1110    Returns
1111    -------
1112    A `precision` string which is lower or higher than the given precision unit.
1113
1114    Examples
1115    --------
1116    >>> get_next_precision_unit('nanosecond')
1117    'microsecond'
1118    >>> get_next_precision_unit('ms')
1119    'second'
1120    >>> get_next_precision_unit('hour', decrease=False)
1121    'minute'
1122    """
1123    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1124    precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None)
1125    if not precision_scalar:
1126        raise ValueError(f"Invalid precision unit '{precision_unit}'.")
1127
1128    precisions = sorted(
1129        list(MRSM_PRECISION_UNITS_SCALARS),
1130        key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p]
1131    )
1132
1133    precision_index = precisions.index(true_precision_unit)
1134    new_precision_index = precision_index + (-1 if decrease else 1)
1135    if new_precision_index < 0 or new_precision_index >= len(precisions):
1136        raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.")
1137
1138    return precisions[new_precision_index]
1139
1140
1141def round_time(
1142    dt: Optional[datetime] = None,
1143    date_delta: Optional[timedelta] = None,
1144    to: 'str' = 'down'
1145) -> datetime:
1146    """
1147    Round a datetime object to a multiple of a timedelta.
1148
1149    Parameters
1150    ----------
1151    dt: Optional[datetime], default None
1152        If `None`, grab the current UTC datetime.
1153
1154    date_delta: Optional[timedelta], default None
1155        If `None`, use a delta of 1 minute.
1156
1157    to: 'str', default 'down'
1158        Available options are `'up'`, `'down'`, and `'closest'`.
1159
1160    Returns
1161    -------
1162    A rounded `datetime` object.
1163
1164    Examples
1165    --------
1166    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
1167    datetime.datetime(2022, 1, 1, 12, 15)
1168    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
1169    datetime.datetime(2022, 1, 1, 12, 16)
1170    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
1171    datetime.datetime(2022, 1, 1, 12, 0)
1172    >>> round_time(
1173    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
1174    ...   timedelta(hours=1),
1175    ...   to = 'closest'
1176    ... )
1177    datetime.datetime(2022, 1, 1, 12, 0)
1178    >>> round_time(
1179    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
1180    ...   datetime.timedelta(hours=1),
1181    ...   to = 'closest'
1182    ... )
1183    datetime.datetime(2022, 1, 1, 13, 0)
1184
1185    """
1186    from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP
1187    if date_delta is None:
1188        date_delta = timedelta(minutes=1)
1189
1190    if dt is None:
1191        dt = datetime.now(timezone.utc).replace(tzinfo=None)
1192
1193    def get_total_microseconds(td: timedelta) -> int:
1194        return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds
1195
1196    round_to_microseconds = get_total_microseconds(date_delta)
1197    if round_to_microseconds == 0:
1198        return dt
1199
1200    dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min
1201    dt_total_microseconds = get_total_microseconds(dt_delta_from_min)
1202
1203    dt_dec = Decimal(dt_total_microseconds)
1204    round_to_dec = Decimal(round_to_microseconds)
1205
1206    div = dt_dec / round_to_dec
1207    if to == 'down':
1208        num_intervals = div.to_integral_value(rounding=ROUND_DOWN)
1209    elif to == 'up':
1210        num_intervals = div.to_integral_value(rounding=ROUND_UP)
1211    else:
1212        num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP)
1213
1214    rounded_dt_total_microseconds = num_intervals * round_to_dec
1215    adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds
1216
1217    return dt + timedelta(microseconds=adjustment_microseconds)
MRSM_ALIAS_DTYPES: Dict[str, str] = {'decimal': 'numeric', 'Decimal': 'numeric', 'number': 'numeric', 'jsonl': 'json', 'JSON': 'json', 'binary': 'bytes', 'blob': 'bytes', 'varbinary': 'bytes', 'bytea': 'bytes', 'guid': 'uuid', 'UUID': 'uuid', 'geom': 'geometry', 'geog': 'geography', 'boolean': 'bool', 'day': 'date'}
MRSM_PD_DTYPES: Dict[Optional[str], str] = {'json': 'object', 'numeric': 'object', 'geometry': 'object', 'geography': 'object', 'uuid': 'object', 'date': 'date32[day][pyarrow]', 'datetime': 'datetime64[us, UTC]', 'bool': 'bool[pyarrow]', 'int': 'int64[pyarrow]', 'int8': 'int8[pyarrow]', 'int16': 'int16[pyarrow]', 'int32': 'int32[pyarrow]', 'int64': 'int64[pyarrow]', 'str': 'string', 'bytes': 'binary[pyarrow]', None: 'object'}
MRSM_PRECISION_UNITS_SCALARS: Dict[str, Union[int, float]] = {'nanosecond': 1000000000, 'microsecond': 1000000, 'millisecond': 1000, 'second': 1, 'minute': 0.016666666666666666, 'hour': 0.0002777777777777778, 'day': 1.1574074074074073e-05}
MRSM_PRECISION_UNITS_ALIASES: Dict[str, str] = {'ns': 'nanosecond', 'us': 'microsecond', 'ms': 'millisecond', 's': 'second', 'sec': 'second', 'm': 'minute', 'min': 'minute', 'h': 'hour', 'hr': 'hour', 'd': 'day', 'D': 'day'}
MRSM_PRECISION_UNITS_ABBREVIATIONS: Dict[str, str] = {'nanosecond': 'ns', 'microsecond': 'us', 'millisecond': 'ms', 'second': 's', 'minute': 'min', 'hour': 'hr', 'day': 'D'}
def to_pandas_dtype(dtype: str) -> str:
 92def to_pandas_dtype(dtype: str) -> str:
 93    """
 94    Cast a supported Meerschaum dtype to a Pandas dtype.
 95    """
 96    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
 97    if known_dtype is not None:
 98        return known_dtype
 99
100    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
101    if alias_dtype is not None:
102        return MRSM_PD_DTYPES[alias_dtype]
103
104    if dtype.startswith('numeric'):
105        return MRSM_PD_DTYPES['numeric']
106
107    if dtype.startswith('geometry'):
108        return MRSM_PD_DTYPES['geometry']
109
110    if dtype.startswith('geography'):
111        return MRSM_PD_DTYPES['geography']
112
113    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
114    ### treat it as a SQL db type.
115    if dtype.split(' ')[0].isupper():
116        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
117        return get_pd_type_from_db_type(dtype)
118
119    from meerschaum.utils.packages import attempt_import
120    _ = attempt_import('pyarrow', lazy=False)
121    pandas = attempt_import('pandas', lazy=False)
122
123    try:
124        return str(pandas.api.types.pandas_dtype(dtype))
125    except Exception:
126        warn(
127            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
128            + f"{traceback.format_exc()}",
129            stack=False,
130        )
131    return 'object'

Cast a supported Meerschaum dtype to a Pandas dtype.

def are_dtypes_equal( ldtype: Union[str, Dict[str, str]], rdtype: Union[str, Dict[str, str]]) -> bool:
134def are_dtypes_equal(
135    ldtype: Union[str, Dict[str, str]],
136    rdtype: Union[str, Dict[str, str]],
137) -> bool:
138    """
139    Determine whether two dtype strings may be considered
140    equivalent to avoid unnecessary conversions.
141
142    Parameters
143    ----------
144    ldtype: Union[str, Dict[str, str]]
145        The left dtype to compare.
146        May also provide a dtypes dictionary.
147
148    rdtype: Union[str, Dict[str, str]]
149        The right dtype to compare.
150        May also provide a dtypes dictionary.
151
152    Returns
153    -------
154    A `bool` indicating whether the two dtypes are to be considered equivalent.
155    """
156    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
157        lkeys = sorted([str(k) for k in ldtype.keys()])
158        rkeys = sorted([str(k) for k in rdtype.keys()])
159        for lkey, rkey in zip(lkeys, rkeys):
160            if lkey != rkey:
161                return False
162            ltype = ldtype[lkey]
163            rtype = rdtype[rkey]
164            if not are_dtypes_equal(ltype, rtype):
165                return False
166        return True
167
168    try:
169        if ldtype == rdtype:
170            return True
171    except Exception:
172        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
173        return False
174
175    ### Sometimes pandas dtype objects are passed.
176    ldtype = str(ldtype).split('[', maxsplit=1)[0]
177    rdtype = str(rdtype).split('[', maxsplit=1)[0]
178
179    if ldtype in MRSM_ALIAS_DTYPES:
180        ldtype = MRSM_ALIAS_DTYPES[ldtype]
181
182    if rdtype in MRSM_ALIAS_DTYPES:
183        rdtype = MRSM_ALIAS_DTYPES[rdtype]
184
185    json_dtypes = ('json', 'object')
186    if ldtype in json_dtypes and rdtype in json_dtypes:
187        return True
188
189    numeric_dtypes = ('numeric', 'decimal', 'object')
190    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
191        return True
192
193    uuid_dtypes = ('uuid', 'object')
194    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
195        return True
196
197    bytes_dtypes = ('bytes', 'object', 'binary')
198    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
199        return True
200
201    geometry_dtypes = ('geometry', 'object', 'geography')
202    if ldtype in geometry_dtypes and rdtype in geometry_dtypes:
203        return True
204
205    if ldtype.lower() == rdtype.lower():
206        return True
207
208    datetime_dtypes = ('datetime', 'timestamp')
209    ldtype_found_dt_prefix = False
210    rdtype_found_dt_prefix = False
211    for dt_prefix in datetime_dtypes:
212        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
213        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
214    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
215        return True
216
217    string_dtypes = ('str', 'string', 'object')
218    if ldtype in string_dtypes and rdtype in string_dtypes:
219        return True
220
221    int_dtypes = (
222        'int', 'int64', 'int32', 'int16', 'int8',
223        'uint', 'uint64', 'uint32', 'uint16', 'uint8',
224    )
225    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
226        return True
227
228    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
229    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
230        return True
231
232    bool_dtypes = ('bool', 'boolean')
233    if ldtype in bool_dtypes and rdtype in bool_dtypes:
234        return True
235
236    date_dtypes = (
237        'date', 'date32', 'date32[pyarrow]', 'date32[day][pyarrow]',
238        'date64', 'date64[pyarrow]', 'date64[ms][pyarrow]',
239    )
240    if ldtype in date_dtypes and rdtype in date_dtypes:
241        return True
242
243    return False

Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.

Parameters
  • ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
  • rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
  • A bool indicating whether the two dtypes are to be considered equivalent.
def is_dtype_numeric(dtype: str) -> bool:
246def is_dtype_numeric(dtype: str) -> bool:
247    """
248    Determine whether a given `dtype` string
249    should be considered compatible with the Meerschaum dtype `numeric`.
250
251    Parameters
252    ----------
253    dtype: str
254        The pandas-like dtype string.
255
256    Returns
257    -------
258    A bool indicating the dtype is compatible with `numeric`.
259    """
260    dtype_lower = dtype.lower()
261
262    acceptable_substrings = ('numeric', 'float', 'double', 'int')
263    for substring in acceptable_substrings:
264        if substring in dtype_lower:
265            return True
266
267    return False

Determine whether a given dtype string should be considered compatible with the Meerschaum dtype numeric.

Parameters
  • dtype (str): The pandas-like dtype string.
Returns
  • A bool indicating the dtype is compatible with numeric.
def attempt_cast_to_numeric( value: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
270def attempt_cast_to_numeric(
271    value: Any,
272    quantize: bool = False,
273    precision: Optional[int] = None,
274    scale: Optional[int] = None,
275)-> Any:
276    """
277    Given a value, attempt to coerce it into a numeric (Decimal).
278
279    Parameters
280    ----------
281    value: Any
282        The value to be cast to a Decimal.
283
284    quantize: bool, default False
285        If `True`, quantize the decimal to the specified precision and scale.
286
287    precision: Optional[int], default None
288        If `quantize` is `True`, use this precision.
289
290    scale: Optional[int], default None
291        If `quantize` is `True`, use this scale.
292
293    Returns
294    -------
295    A `Decimal` if possible, or `value`.
296    """
297    if isinstance(value, Decimal):
298        if quantize and precision and scale:
299            return quantize_decimal(value, precision, scale)
300        return value
301    try:
302        if value_is_null(value):
303            return Decimal('NaN')
304
305        dec = Decimal(str(value))
306        if not quantize or not precision or not scale:
307            return dec
308        return quantize_decimal(dec, precision, scale)
309    except Exception:
310        return value

Given a value, attempt to coerce it into a numeric (Decimal).

Parameters
  • value (Any): The value to be cast to a Decimal.
  • quantize (bool, default False): If True, quantize the decimal to the specified precision and scale.
  • precision (Optional[int], default None): If quantize is True, use this precision.
  • scale (Optional[int], default None): If quantize is True, use this scale.
Returns
  • A Decimal if possible, or value.
def attempt_cast_to_uuid(value: Any) -> Any:
313def attempt_cast_to_uuid(value: Any) -> Any:
314    """
315    Given a value, attempt to coerce it into a UUID (`uuid4`).
316    """
317    if isinstance(value, uuid.UUID):
318        return value
319    try:
320        return (
321            uuid.UUID(str(value))
322            if not value_is_null(value)
323            else None
324        )
325    except Exception:
326        return value

Given a value, attempt to coerce it into a UUID (uuid4).

def attempt_cast_to_bytes(value: Any) -> Any:
329def attempt_cast_to_bytes(value: Any) -> Any:
330    """
331    Given a value, attempt to coerce it into a bytestring.
332    """
333    if isinstance(value, bytes):
334        return value
335    try:
336        return (
337            deserialize_bytes_string(str(value))
338            if not value_is_null(value)
339            else None
340        )
341    except Exception:
342        return value

Given a value, attempt to coerce it into a bytestring.

def attempt_cast_to_geometry(value: Any) -> Any:
345def attempt_cast_to_geometry(value: Any) -> Any:
346    """
347    Given a value, attempt to coerce it into a `shapely` (`geometry`) object.
348    """
349    shapely, shapely_wkt, shapely_wkb = mrsm.attempt_import(
350        'shapely',
351        'shapely.wkt',
352        'shapely.wkb',
353        lazy=False,
354    )
355    if 'shapely' in str(type(value)):
356        return value
357
358    if isinstance(value, (dict, list)):
359        try:
360            return shapely.from_geojson(json.dumps(value))
361        except Exception:
362            return value
363
364    value_is_wkt = geometry_is_wkt(value)
365    if value_is_wkt is None:
366        return value
367
368    try:
369        return (
370            shapely_wkt.loads(value)
371            if value_is_wkt
372            else shapely_wkb.loads(value)
373        )
374    except Exception:
375        return value

Given a value, attempt to coerce it into a shapely (geometry) object.

def geometry_is_wkt(value: Union[str, bytes]) -> Optional[bool]:
378def geometry_is_wkt(value: Union[str, bytes]) -> Union[bool, None]:
379    """
380    Determine whether an input value should be treated as WKT or WKB geometry data.
381
382    Parameters
383    ----------
384    value: Union[str, bytes]
385        The input data to be parsed into geometry data.
386
387    Returns
388    -------
389    A `bool` (`True` if `value` is WKT and `False` if it should be treated as WKB).
390    Return `None` if `value` should be parsed as neither.
391    """
392    import re
393    if not isinstance(value, (str, bytes)):
394        return None
395
396    if isinstance(value, bytes):
397        return False
398    
399    wkt_pattern = r'^\s*(POINT|LINESTRING|POLYGON|MULTIPOINT|MULTILINESTRING|MULTIPOLYGON|GEOMETRYCOLLECTION)\s*\(.*\)\s*$'
400    if re.match(wkt_pattern, value, re.IGNORECASE):
401        return True
402    
403    if all(c in '0123456789ABCDEFabcdef' for c in value) and len(value) % 2 == 0:
404        return False
405    
406    return None

Determine whether an input value should be treated as WKT or WKB geometry data.

Parameters
  • value (Union[str, bytes]): The input data to be parsed into geometry data.
Returns
  • A bool (True if value is WKT and False if it should be treated as WKB).
  • Return None if value should be parsed as neither.
def value_is_null(value: Any) -> bool:
409def value_is_null(value: Any) -> bool:
410    """
411    Determine if a value is a null-like string.
412    """
413    return str(value).lower() in ('none', 'nan', 'na', 'nat', 'natz', '', '<na>')

Determine if a value is a null-like string.

def none_if_null(value: Any) -> Any:
416def none_if_null(value: Any) -> Any:
417    """
418    Return `None` if a value is a null-like string.
419    """
420    return (None if value_is_null(value) else value)

Return None if a value is a null-like string.

def quantize_decimal(x: decimal.Decimal, precision: int, scale: int) -> decimal.Decimal:
423def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
424    """
425    Quantize a given `Decimal` to a known scale and precision.
426
427    Parameters
428    ----------
429    x: Decimal
430        The `Decimal` to be quantized.
431
432    precision: int
433        The total number of significant digits.
434
435    scale: int
436        The number of significant digits after the decimal point.
437
438    Returns
439    -------
440    A `Decimal` quantized to the specified scale and precision.
441    """
442    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
443    try:
444        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
445    except InvalidOperation:
446        pass
447
448    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")

Quantize a given Decimal to a known scale and precision.

Parameters
  • x (Decimal): The Decimal to be quantized.
  • precision (int): The total number of significant digits.
  • scale (int): The number of significant digits after the decimal point.
Returns
  • A Decimal quantized to the specified scale and precision.
def serialize_decimal( x: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
451def serialize_decimal(
452    x: Any,
453    quantize: bool = False,
454    precision: Optional[int] = None,
455    scale: Optional[int] = None,
456) -> Any:
457    """
458    Return a quantized string of an input decimal.
459
460    Parameters
461    ----------
462    x: Any
463        The potential decimal to be serialized.
464
465    quantize: bool, default False
466        If `True`, quantize the incoming Decimal to the specified scale and precision
467        before serialization.
468
469    precision: Optional[int], default None
470        The precision of the decimal to be quantized.
471
472    scale: Optional[int], default None
473        The scale of the decimal to be quantized.
474
475    Returns
476    -------
477    A string of the input decimal or the input if not a Decimal.
478    """
479    if not isinstance(x, Decimal):
480        return x
481
482    if value_is_null(x):
483        return None
484
485    if quantize and scale and precision:
486        x = quantize_decimal(x, precision, scale)
487
488    return f"{x:f}"

Return a quantized string of an input decimal.

Parameters
  • x (Any): The potential decimal to be serialized.
  • quantize (bool, default False): If True, quantize the incoming Decimal to the specified scale and precision before serialization.
  • precision (Optional[int], default None): The precision of the decimal to be quantized.
  • scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
  • A string of the input decimal or the input if not a Decimal.
def coerce_timezone(dt: Any, strip_utc: bool = False) -> Any:
491def coerce_timezone(
492    dt: Any,
493    strip_utc: bool = False,
494) -> Any:
495    """
496    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
497    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
498    """
499    if dt is None:
500        return None
501
502    if isinstance(dt, int):
503        return dt
504
505    if isinstance(dt, str):
506        dateutil_parser = mrsm.attempt_import('dateutil.parser')
507        try:
508            dt = dateutil_parser.parse(dt)
509        except Exception:
510            return dt
511
512    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
513    if dt_is_series:
514        pandas = mrsm.attempt_import('pandas', lazy=False)
515
516        if (
517            pandas.api.types.is_datetime64_any_dtype(dt) and (
518                (dt.dt.tz is not None and not strip_utc)
519                or
520                (dt.dt.tz is None and strip_utc)
521            )
522        ):
523            return dt
524
525        dt_series = to_datetime(dt, coerce_utc=False)
526        if dt_series.dt.tz is None:
527            dt_series = dt_series.dt.tz_localize(timezone.utc)
528        if strip_utc:
529            try:
530                if dt_series.dt.tz is not None:
531                    dt_series = dt_series.dt.tz_localize(None)
532            except Exception:
533                pass
534
535        return dt_series
536
537    if dt.tzinfo is None:
538        if strip_utc:
539            return dt
540        return dt.replace(tzinfo=timezone.utc)
541
542    utc_dt = dt.astimezone(timezone.utc)
543    if strip_utc:
544        return utc_dt.replace(tzinfo=None)
545    return utc_dt

Given a datetime, pandas Timestamp or Series of Timestamp, return a UTC timestamp (strip timezone if strip_utc is True.

def to_datetime( dt_val: Any, as_pydatetime: bool = False, coerce_utc: bool = True, precision_unit: Optional[str] = None) -> Any:
548def to_datetime(
549    dt_val: Any,
550    as_pydatetime: bool = False,
551    coerce_utc: bool = True,
552    precision_unit: Optional[str] = None,
553) -> Any:
554    """
555    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
556
557    Parameters
558    ----------
559    dt_val: Any
560        The value to coerce to Pandas Timestamps.
561
562    as_pydatetime: bool, default False
563        If `True`, return a Python datetime object.
564
565    coerce_utc: bool, default True
566        If `True`, ensure the value has UTC tzinfo.
567
568    precision_unit: Optional[str], default None
569        If provided, enforce the provided precision unit.
570    """
571    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
572    is_dask = 'dask' in getattr(dt_val, '__module__', '')
573    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
574    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
575    pd = pandas if dd is None else dd
576    enforce_precision = precision_unit is not None
577    precision_unit = precision_unit or 'microsecond'
578    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
579    precision_abbreviation = MRSM_PRECISION_UNITS_ABBREVIATIONS.get(true_precision_unit, None)
580    if not precision_abbreviation:
581        raise ValueError(f"Invalid precision '{precision_unit}'.")
582
583    def parse(x: Any) -> Any:
584        try:
585            return dateutil_parser.parse(x)
586        except Exception:
587            return x
588
589    def check_dtype(dtype_to_check: str, with_utc: bool = True) -> bool:
590        dtype_check_against = (
591            f"datetime64[{precision_abbreviation}, UTC]"
592            if with_utc
593            else f"datetime64[{precision_abbreviation}]"
594        )
595        return (
596            dtype_to_check == dtype_check_against
597            if enforce_precision
598            else (
599                dtype_to_check.startswith('datetime64[')
600                and (
601                    ('utc' in dtype_to_check.lower())
602                    if with_utc
603                    else ('utc' not in dtype_to_check.lower())
604                )
605            )
606        )
607
608    if isinstance(dt_val, pd.Timestamp):
609        dt_val_to_return = dt_val if not as_pydatetime else dt_val.to_pydatetime()
610        return (
611            coerce_timezone(dt_val_to_return)
612            if coerce_utc
613            else dt_val_to_return
614        )
615
616    if dt_is_series:
617        changed_tz = False
618        original_tz = None
619        dtype = str(getattr(dt_val, 'dtype', 'object'))
620        if (
621            are_dtypes_equal(dtype, 'datetime')
622            and 'utc' not in dtype.lower()
623            and hasattr(dt_val, 'dt')
624        ):
625            original_tz = dt_val.dt.tz
626            dt_val = dt_val.dt.tz_localize(timezone.utc)
627            changed_tz = True
628            dtype = str(getattr(dt_val, 'dtype', 'object'))
629        try:
630            new_dt_series = (
631                dt_val
632                if check_dtype(dtype, with_utc=True)
633                else dt_val.astype(f"datetime64[{precision_abbreviation}, UTC]")
634            )
635        except pd.errors.OutOfBoundsDatetime:
636            try:
637                next_precision = get_next_precision_unit(true_precision_unit)
638                next_precision_abbrevation = MRSM_PRECISION_UNITS_ABBREVIATIONS[next_precision]
639                new_dt_series = dt_val.astype(f"datetime64[{next_precision_abbrevation}, UTC]")
640            except Exception:
641                new_dt_series = None
642        except ValueError:
643            new_dt_series = None
644        except TypeError:
645            try:
646                new_dt_series = (
647                    new_dt_series
648                    if check_dtype(str(getattr(new_dt_series, 'dtype', None)), with_utc=False)
649                    else dt_val.astype(f"datetime64[{precision_abbreviation}]")
650                )
651            except Exception:
652                new_dt_series = None
653
654        if new_dt_series is None:
655            new_dt_series = dt_val.apply(lambda x: parse(str(x)))
656
657        if coerce_utc:
658            return coerce_timezone(new_dt_series)
659
660        if changed_tz:
661            new_dt_series = new_dt_series.dt.tz_localize(original_tz)
662        return new_dt_series
663
664    try:
665        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
666        if new_dt_val.unit != precision_abbreviation:
667            new_dt_val = new_dt_val.as_unit(precision_abbreviation)
668        if as_pydatetime:
669            return new_dt_val.to_pydatetime()
670        return new_dt_val
671    except (pd.errors.OutOfBoundsDatetime, ValueError):
672        pass
673
674    new_dt_val = parse(dt_val)
675    if not coerce_utc:
676        return new_dt_val
677    return coerce_timezone(new_dt_val)

Wrap pd.to_datetime() and add support for out-of-bounds values.

Parameters
  • dt_val (Any): The value to coerce to Pandas Timestamps.
  • as_pydatetime (bool, default False): If True, return a Python datetime object.
  • coerce_utc (bool, default True): If True, ensure the value has UTC tzinfo.
  • precision_unit (Optional[str], default None): If provided, enforce the provided precision unit.
def serialize_bytes(data: bytes) -> str:
680def serialize_bytes(data: bytes) -> str:
681    """
682    Return the given bytes as a base64-encoded string.
683    """
684    import base64
685    if not isinstance(data, bytes) and value_is_null(data):
686        return data
687    return base64.b64encode(data).decode('utf-8')

Return the given bytes as a base64-encoded string.

def serialize_geometry( geom: Any, geometry_format: str = 'wkb_hex', srid: Optional[int] = None) -> Union[str, Dict[str, Any], NoneType]:
690def serialize_geometry(
691    geom: Any,
692    geometry_format: str = 'wkb_hex',
693    srid: Optional[int] = None,
694) -> Union[str, Dict[str, Any], None]:
695    """
696    Serialize geometry data as a hex-encoded well-known-binary string. 
697
698    Parameters
699    ----------
700    geom: Any
701        The potential geometry data to be serialized.
702
703    geometry_format: str, default 'wkb_hex'
704        The serialization format for geometry data.
705        Accepted formats are `wkb_hex` (well-known binary hex string),
706        `wkt` (well-known text), and `geojson`.
707
708    srid: Optional[int], default None
709        If provided, use this as the source CRS when serializing to GeoJSON.
710
711    Returns
712    -------
713    A string containing the geometry data.
714    """
715    if value_is_null(geom):
716        return None
717    shapely, shapely_ops, pyproj = mrsm.attempt_import(
718        'shapely', 'shapely.ops', 'pyproj',
719        lazy=False,
720    )
721    if geometry_format == 'geojson':
722        if srid:
723            transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", "EPSG:4326", always_xy=True)
724            geom = shapely_ops.transform(transformer.transform, geom)
725        geojson_str = shapely.to_geojson(geom)
726        return json.loads(geojson_str)
727
728    if hasattr(geom, 'wkb_hex'):
729        if geometry_format == "wkb_hex":
730            return shapely.to_wkb(geom, hex=True, include_srid=True)
731        return shapely.to_wkt(geom)
732
733    return str(geom)

Serialize geometry data as a hex-encoded well-known-binary string.

Parameters
  • geom (Any): The potential geometry data to be serialized.
  • geometry_format (str, default 'wkb_hex'): The serialization format for geometry data. Accepted formats are wkb_hex (well-known binary hex string), wkt (well-known text), and geojson.
  • srid (Optional[int], default None): If provided, use this as the source CRS when serializing to GeoJSON.
Returns
  • A string containing the geometry data.
def deserialize_geometry(geom_wkb: Union[str, bytes]):
736def deserialize_geometry(geom_wkb: Union[str, bytes]):
737    """
738    Deserialize a WKB string into a shapely geometry object.
739    """
740    shapely = mrsm.attempt_import('shapely', lazy=False)
741    return shapely.wkb.loads(geom_wkb)

Deserialize a WKB string into a shapely geometry object.

def project_geometry(geom, srid: int, to_srid: int = 4326):
744def project_geometry(geom, srid: int, to_srid: int = 4326):
745    """
746    Project a shapely geometry object to a new CRS (SRID).
747    """
748    pyproj, shapely_ops = mrsm.attempt_import('pyproj', 'shapely.ops', lazy=False)
749    transformer = pyproj.Transformer.from_crs(f"EPSG:{srid}", f"EPSG:{to_srid}", always_xy=True)
750    return shapely_ops.transform(transformer.transform, geom)

Project a shapely geometry object to a new CRS (SRID).

def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Optional[bytes]:
753def deserialize_bytes_string(data: Optional[str], force_hex: bool = False) -> Union[bytes, None]:
754    """
755    Given a serialized ASCII string of bytes data, return the original bytes.
756    The input data may either be base64- or hex-encoded.
757
758    Parameters
759    ----------
760    data: Optional[str]
761        The string to be deserialized into bytes.
762        May be base64- or hex-encoded (prefixed with `'\\x'`).
763
764    force_hex: bool = False
765        If `True`, treat the input string as hex-encoded.
766        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
767        This will still strip the leading `'\\x'` prefix if present.
768
769    Returns
770    -------
771    The original bytes used to produce the encoded string `data`.
772    """
773    if not isinstance(data, str) and value_is_null(data):
774        return data
775
776    import binascii
777    import base64
778
779    is_hex = force_hex or data.startswith('\\x')
780
781    if is_hex:
782        if data.startswith('\\x'):
783            data = data[2:]
784        return binascii.unhexlify(data)
785
786    return base64.b64decode(data)

Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.

Parameters
  • data (Optional[str]): The string to be deserialized into bytes. May be base64- or hex-encoded (prefixed with '\x').
  • force_hex (bool = False): If True, treat the input string as hex-encoded. If data does not begin with the prefix '\x', set force_hex to True. This will still strip the leading '\x' prefix if present.
Returns
  • The original bytes used to produce the encoded string data.
def deserialize_base64(data: str) -> bytes:
789def deserialize_base64(data: str) -> bytes:
790    """
791    Return the original bytestring from the given base64-encoded string.
792    """
793    import base64
794    return base64.b64decode(data)

Return the original bytestring from the given base64-encoded string.

def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Optional[str]:
797def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> Union[str, None]:
798    """
799    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
800    """
801    import binascii
802    if not isinstance(data, bytes) and value_is_null(data):
803        return data
804    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')

Return the given bytes as a hex string for PostgreSQL's BYTEA type.

def serialize_datetime(dt: datetime.datetime) -> Optional[str]:
807def serialize_datetime(dt: datetime) -> Union[str, None]:
808    """
809    Serialize a datetime object into JSON (ISO format string).
810
811    Examples
812    --------
813    >>> import json
814    >>> from datetime import datetime
815    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
816    '{"a": "2022-01-01T00:00:00Z"}'
817
818    """
819    if not hasattr(dt, 'isoformat'):
820        return None
821
822    tz_suffix = 'Z' if getattr(dt, 'tzinfo', None) is None else ''
823    return dt.isoformat() + tz_suffix

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
def serialize_date(d: datetime.date) -> Optional[str]:
826def serialize_date(d: date) -> Union[str, None]:
827    """
828    Serialize a date object into its ISO representation.
829    """
830    return d.isoformat() if hasattr(d, 'isoformat') else None

Serialize a date object into its ISO representation.

def json_serialize_value(x: Any, default_to_str: bool = True) -> Optional[str]:
833def json_serialize_value(x: Any, default_to_str: bool = True) -> Union[str, None]:
834    """
835    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
836
837    Parameters
838    ----------
839    x: Any
840        The value to serialize.
841
842    default_to_str: bool, default True
843        If `True`, return a string of `x` if x is not a designated type.
844        Otherwise return x.
845
846    Returns
847    -------
848    A serialized version of x, or x.
849    """
850    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
851        return x.meta
852
853    if hasattr(x, 'tzinfo'):
854        return serialize_datetime(x)
855
856    if hasattr(x, 'isoformat'):
857        return serialize_date(x)
858
859    if isinstance(x, bytes):
860        return serialize_bytes(x)
861
862    if isinstance(x, Decimal):
863        return serialize_decimal(x)
864
865    if 'shapely' in str(type(x)):
866        return serialize_geometry(x)
867
868    if value_is_null(x):
869        return None
870
871    if isinstance(x, (dict, list, tuple)):
872        return json.dumps(x, default=json_serialize_value, separators=(',', ':'))
873
874    return str(x) if default_to_str else x

Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.

Parameters
  • x (Any): The value to serialize.
  • default_to_str (bool, default True): If True, return a string of x if x is not a designated type. Otherwise return x.
Returns
  • A serialized version of x, or x.
def get_geometry_type_srid( dtype: str = 'geometry', default_type: str = 'geometry', default_srid: int = 4326) -> Union[Tuple[str, int], Tuple[str, NoneType]]:
877def get_geometry_type_srid(
878    dtype: str = 'geometry',
879    default_type: str = 'geometry',
880    default_srid: int = 4326,
881) -> Union[Tuple[str, int], Tuple[str, None]]:
882    """
883    Given the specified geometry `dtype`, return a tuple in the form (type, SRID).
884
885    Parameters
886    ----------
887    dtype: Optional[str], default None
888        Optionally provide a specific `geometry` syntax (e.g. `geometry[MultiLineString, 4326]`).
889        You may specify a supported `shapely` geometry type and an SRID in the dtype modifier:
890
891        - `Point`
892        - `LineString`
893        - `LinearRing`
894        - `Polygon`
895        - `MultiPoint`
896        - `MultiLineString`
897        - `MultiPolygon`
898        - `GeometryCollection`
899
900    Returns
901    -------
902    A tuple in the form (type, SRID).
903    Defaults to `(default_type, default_srid)`.
904
905    Examples
906    --------
907    >>> from meerschaum.utils.dtypes import get_geometry_type_srid
908    >>> get_geometry_type_srid()
909    ('geometry', 4326)
910    >>> get_geometry_type_srid('geometry[]')
911    ('geometry', 4326)
912    >>> get_geometry_type_srid('geometry[Point, 0]')
913    ('Point', 0)
914    >>> get_geometry_type_srid('geometry[0, Point]')
915    ('Point', 0)
916    >>> get_geometry_type_srid('geometry[0]')
917    ('geometry', 0)
918    >>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
919    ('MultiLineString', 4326)
920    >>> get_geometry_type_srid('geography')
921    ('geometry', 4326)
922    >>> get_geometry_type_srid('geography[POINT]')
923    ('Point', 4376)
924    """
925    from meerschaum.utils.misc import is_int
926    ### NOTE: PostGIS syntax must also be parsed.
927    dtype = dtype.replace('(', '[').replace(')', ']')
928    bare_dtype = dtype.split('[', maxsplit=1)[0]
929    modifier = dtype.split(bare_dtype, maxsplit=1)[-1].lstrip('[').rstrip(']')
930    if not modifier:
931        return default_type, default_srid
932
933    parts = [
934        part.split('=')[-1].strip()
935        for part in modifier.split(',')
936    ]
937    parts_casted = [
938        (
939            int(part)
940            if is_int(part)
941            else part
942        )
943        for part in parts
944    ]
945
946    srid = default_srid
947    geometry_type = default_type
948
949    for part in parts_casted:
950        if isinstance(part, int):
951            srid = part
952            break
953
954    for part in parts_casted:
955        if isinstance(part, str):
956            geometry_type = part
957            break
958
959    return geometry_type, srid

Given the specified geometry dtype, return a tuple in the form (type, SRID).

Parameters
  • dtype (Optional[str], default None): Optionally provide a specific geometry syntax (e.g. geometry[MultiLineString, 4326]). You may specify a supported shapely geometry type and an SRID in the dtype modifier:

    • Point
    • LineString
    • LinearRing
    • Polygon
    • MultiPoint
    • MultiLineString
    • MultiPolygon
    • GeometryCollection
Returns
  • A tuple in the form (type, SRID).
  • Defaults to (default_type, default_srid).
Examples
>>> from meerschaum.utils.dtypes import get_geometry_type_srid
>>> get_geometry_type_srid()
('geometry', 4326)
>>> get_geometry_type_srid('geometry[]')
('geometry', 4326)
>>> get_geometry_type_srid('geometry[Point, 0]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0, Point]')
('Point', 0)
>>> get_geometry_type_srid('geometry[0]')
('geometry', 0)
>>> get_geometry_type_srid('geometry[MULTILINESTRING, 4326]')
('MultiLineString', 4326)
>>> get_geometry_type_srid('geography')
('geometry', 4326)
>>> get_geometry_type_srid('geography[POINT]')
('Point', 4376)
def get_current_timestamp( precision_unit: str = 'microsecond', precision_interval: int = 1, round_to: str = 'down', as_pandas: bool = False, as_int: bool = False, _now: Union[datetime.datetime, int, NoneType] = None) -> 'Union[datetime, pd.Timestamp, int]':
 962def get_current_timestamp(
 963    precision_unit: str = _STATIC_CONFIG['dtypes']['datetime']['default_precision_unit'],
 964    precision_interval: int = 1,
 965    round_to: str = 'down',
 966    as_pandas: bool = False,
 967    as_int: bool = False,
 968    _now: Union[datetime, int, None] = None,
 969) -> 'Union[datetime, pd.Timestamp, int]':
 970    """
 971    Return the current UTC timestamp to nanosecond precision.
 972
 973    Parameters
 974    ----------
 975    precision_unit: str, default 'us'
 976        The precision of the timestamp to be returned.
 977        Valid values are the following:
 978            - `ns` / `nanosecond`
 979            - `us` / `microsecond`
 980            - `ms` / `millisecond`
 981            - `s` / `sec` / `second`
 982            - `m` / `min` / `minute`
 983            - `h` / `hr` / `hour`
 984            - `d` / `day`
 985
 986    precision_interval: int, default 1
 987        Round the timestamp to the `precision_interval` units.
 988        For example, `precision='minute'` and `precision_interval=15` will round to 15-minute intervals.
 989        Note: `precision_interval` must be 1 when `precision='nanosecond'`.
 990
 991    round_to: str, default 'down'
 992        The direction to which to round the timestamp.
 993        Available options are `down`, `up`, and `closest`.
 994
 995    as_pandas: bool, default False
 996        If `True`, return a Pandas Timestamp.
 997        This is always true if `unit` is `nanosecond`.
 998
 999    as_int: bool, default False
1000        If `True`, return the timestamp to an integer.
1001        Overrides `as_pandas`.
1002
1003    Returns
1004    -------
1005    A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
1006
1007    Examples
1008    --------
1009    >>> get_current_timestamp('ns')
1010    Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
1011    >>> get_current_timestamp('ms')
1012    Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
1013    """
1014    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1015    if true_precision_unit not in MRSM_PRECISION_UNITS_SCALARS:
1016        from meerschaum.utils.misc import items_str
1017        raise ValueError(
1018            f"Unknown precision unit '{precision_unit}'. "
1019            "Accepted values are "
1020            f"{items_str(list(MRSM_PRECISION_UNITS_SCALARS) + list(MRSM_PRECISION_UNITS_ALIASES))}."
1021        )
1022
1023    if not as_int:
1024        as_pandas = as_pandas or true_precision_unit == 'nanosecond'
1025    pd = mrsm.attempt_import('pandas', lazy=False) if as_pandas else None
1026
1027    if true_precision_unit == 'nanosecond':
1028        if precision_interval != 1:
1029            warn("`precision_interval` must be 1 for nanosecond precision.")
1030        now_ts = time.time_ns() if not isinstance(_now, int) else _now
1031        if as_int:
1032            return now_ts
1033        return pd.to_datetime(now_ts, unit='ns', utc=True)
1034
1035    now = datetime.now(timezone.utc) if not isinstance(_now, datetime) else _now
1036    delta = timedelta(**{true_precision_unit + 's': precision_interval})
1037    rounded_now = round_time(now, delta, to=round_to)
1038
1039    if as_int:
1040        return int(rounded_now.timestamp() * MRSM_PRECISION_UNITS_SCALARS[true_precision_unit])
1041
1042    ts_val = (
1043        pd.to_datetime(rounded_now, utc=True)
1044        if as_pandas
1045        else rounded_now
1046    )
1047
1048    if not as_pandas:
1049        return ts_val
1050
1051    as_unit_precisions = ('microsecond', 'millisecond', 'second')
1052    if true_precision_unit not in as_unit_precisions:
1053        return ts_val
1054
1055    return ts_val.as_unit(MRSM_PRECISION_UNITS_ABBREVIATIONS[true_precision_unit])

Return the current UTC timestamp to nanosecond precision.

Parameters
  • precision_unit (str, default 'us'): The precision of the timestamp to be returned. Valid values are the following: - ns / nanosecond - us / microsecond - ms / millisecond - s / sec / second - m / min / minute - h / hr / hour - d / day
  • precision_interval (int, default 1): Round the timestamp to the precision_interval units. For example, precision='minute' and precision_interval=15 will round to 15-minute intervals. Note: precision_interval must be 1 when precision='nanosecond'.
  • round_to (str, default 'down'): The direction to which to round the timestamp. Available options are down, up, and closest.
  • as_pandas (bool, default False): If True, return a Pandas Timestamp. This is always true if unit is nanosecond.
  • as_int (bool, default False): If True, return the timestamp to an integer. Overrides as_pandas.
Returns
  • A Pandas Timestamp, datetime object, or integer with precision to the provided unit.
Examples
>>> get_current_timestamp('ns')
Timestamp('2025-07-17 17:59:16.423644369+0000', tz='UTC')
>>> get_current_timestamp('ms')
Timestamp('2025-07-17 17:59:16.424000+0000', tz='UTC')
def is_dtype_special(type_: str) -> bool:
1058def is_dtype_special(type_: str) -> bool:
1059    """
1060    Return whether a dtype should be treated as a special Meerschaum dtype.
1061    This is not the same as a Meerschaum alias.
1062    """
1063    true_type = MRSM_ALIAS_DTYPES.get(type_, type_)
1064    if true_type in (
1065        'uuid',
1066        'json',
1067        'bytes',
1068        'numeric',
1069        'datetime',
1070        'geometry',
1071        'geography',
1072        'date',
1073        'bool',
1074    ):
1075        return True
1076
1077    if are_dtypes_equal(true_type, 'datetime'):
1078        return True
1079
1080    if are_dtypes_equal(true_type, 'date'):
1081        return True
1082
1083    if true_type.startswith('numeric'):
1084        return True
1085
1086    if true_type.startswith('bool'):
1087        return True
1088
1089    if true_type.startswith('geometry'):
1090        return True
1091
1092    if true_type.startswith('geography'):
1093        return True
1094
1095    return False

Return whether a dtype should be treated as a special Meerschaum dtype. This is not the same as a Meerschaum alias.

def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1098def get_next_precision_unit(precision_unit: str, decrease: bool = True) -> str:
1099    """
1100    Get the next precision string in order of value.
1101
1102    Parameters
1103    ----------
1104    precision_unit: str
1105        The precision string (`'nanosecond'`, `'ms'`, etc.).
1106
1107    decrease: bool, defaul True
1108        If `True` return the precision unit which is lower (e.g. `nanosecond` -> `millisecond`).
1109        If `False`, return the precision unit which is higher.
1110
1111    Returns
1112    -------
1113    A `precision` string which is lower or higher than the given precision unit.
1114
1115    Examples
1116    --------
1117    >>> get_next_precision_unit('nanosecond')
1118    'microsecond'
1119    >>> get_next_precision_unit('ms')
1120    'second'
1121    >>> get_next_precision_unit('hour', decrease=False)
1122    'minute'
1123    """
1124    true_precision_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit)
1125    precision_scalar = MRSM_PRECISION_UNITS_SCALARS.get(true_precision_unit, None)
1126    if not precision_scalar:
1127        raise ValueError(f"Invalid precision unit '{precision_unit}'.")
1128
1129    precisions = sorted(
1130        list(MRSM_PRECISION_UNITS_SCALARS),
1131        key=lambda p: MRSM_PRECISION_UNITS_SCALARS[p]
1132    )
1133
1134    precision_index = precisions.index(true_precision_unit)
1135    new_precision_index = precision_index + (-1 if decrease else 1)
1136    if new_precision_index < 0 or new_precision_index >= len(precisions):
1137        raise ValueError(f"No precision {'below' if decrease else 'above'} '{precision_unit}'.")
1138
1139    return precisions[new_precision_index]

Get the next precision string in order of value.

Parameters
  • precision_unit (str): The precision string ('nanosecond', 'ms', etc.).
  • decrease (bool, defaul True): If True return the precision unit which is lower (e.g. nanosecond -> millisecond). If False, return the precision unit which is higher.
Returns
  • A precision string which is lower or higher than the given precision unit.
Examples
>>> get_next_precision_unit('nanosecond')
'microsecond'
>>> get_next_precision_unit('ms')
'second'
>>> get_next_precision_unit('hour', decrease=False)
'minute'
def round_time( dt: Optional[datetime.datetime] = None, date_delta: Optional[datetime.timedelta] = None, to: str = 'down') -> datetime.datetime:
1142def round_time(
1143    dt: Optional[datetime] = None,
1144    date_delta: Optional[timedelta] = None,
1145    to: 'str' = 'down'
1146) -> datetime:
1147    """
1148    Round a datetime object to a multiple of a timedelta.
1149
1150    Parameters
1151    ----------
1152    dt: Optional[datetime], default None
1153        If `None`, grab the current UTC datetime.
1154
1155    date_delta: Optional[timedelta], default None
1156        If `None`, use a delta of 1 minute.
1157
1158    to: 'str', default 'down'
1159        Available options are `'up'`, `'down'`, and `'closest'`.
1160
1161    Returns
1162    -------
1163    A rounded `datetime` object.
1164
1165    Examples
1166    --------
1167    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
1168    datetime.datetime(2022, 1, 1, 12, 15)
1169    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
1170    datetime.datetime(2022, 1, 1, 12, 16)
1171    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
1172    datetime.datetime(2022, 1, 1, 12, 0)
1173    >>> round_time(
1174    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
1175    ...   timedelta(hours=1),
1176    ...   to = 'closest'
1177    ... )
1178    datetime.datetime(2022, 1, 1, 12, 0)
1179    >>> round_time(
1180    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
1181    ...   datetime.timedelta(hours=1),
1182    ...   to = 'closest'
1183    ... )
1184    datetime.datetime(2022, 1, 1, 13, 0)
1185
1186    """
1187    from decimal import Decimal, ROUND_HALF_UP, ROUND_DOWN, ROUND_UP
1188    if date_delta is None:
1189        date_delta = timedelta(minutes=1)
1190
1191    if dt is None:
1192        dt = datetime.now(timezone.utc).replace(tzinfo=None)
1193
1194    def get_total_microseconds(td: timedelta) -> int:
1195        return (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds
1196
1197    round_to_microseconds = get_total_microseconds(date_delta)
1198    if round_to_microseconds == 0:
1199        return dt
1200
1201    dt_delta_from_min = dt.replace(tzinfo=None) - datetime.min
1202    dt_total_microseconds = get_total_microseconds(dt_delta_from_min)
1203
1204    dt_dec = Decimal(dt_total_microseconds)
1205    round_to_dec = Decimal(round_to_microseconds)
1206
1207    div = dt_dec / round_to_dec
1208    if to == 'down':
1209        num_intervals = div.to_integral_value(rounding=ROUND_DOWN)
1210    elif to == 'up':
1211        num_intervals = div.to_integral_value(rounding=ROUND_UP)
1212    else:
1213        num_intervals = div.to_integral_value(rounding=ROUND_HALF_UP)
1214
1215    rounded_dt_total_microseconds = num_intervals * round_to_dec
1216    adjustment_microseconds = int(rounded_dt_total_microseconds) - dt_total_microseconds
1217
1218    return dt + timedelta(microseconds=adjustment_microseconds)

Round a datetime object to a multiple of a timedelta.

Parameters
  • dt (Optional[datetime], default None): If None, grab the current UTC datetime.
  • date_delta (Optional[timedelta], default None): If None, use a delta of 1 minute.
  • to ('str', default 'down'): Available options are 'up', 'down', and 'closest'.
Returns
  • A rounded datetime object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 15, 57, 200),
...   timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 45, 57, 200),
...   datetime.timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)