meerschaum.utils.dtypes

Utility functions for working with data types.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Utility functions for working with data types.
  7"""
  8
  9import traceback
 10import uuid
 11from datetime import timezone, datetime
 12from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP
 13
 14import meerschaum as mrsm
 15from meerschaum.utils.typing import Dict, Union, Any, Optional
 16from meerschaum.utils.warnings import warn
 17
 18MRSM_ALIAS_DTYPES: Dict[str, str] = {
 19    'decimal': 'numeric',
 20    'Decimal': 'numeric',
 21    'number': 'numeric',
 22    'jsonl': 'json',
 23    'JSON': 'json',
 24    'binary': 'bytes',
 25    'blob': 'bytes',
 26    'varbinary': 'bytes',
 27    'bytea': 'bytes',
 28    'guid': 'uuid',
 29    'UUID': 'uuid',
 30}
 31MRSM_PD_DTYPES: Dict[Union[str, None], str] = {
 32    'json': 'object',
 33    'numeric': 'object',
 34    'uuid': 'object',
 35    'datetime': 'datetime64[ns, UTC]',
 36    'bool': 'bool[pyarrow]',
 37    'int': 'Int64',
 38    'int8': 'Int8',
 39    'int16': 'Int16',
 40    'int32': 'Int32',
 41    'int64': 'Int64',
 42    'str': 'string[python]',
 43    'bytes': 'object',
 44    None: 'object',
 45}
 46
 47
 48def to_pandas_dtype(dtype: str) -> str:
 49    """
 50    Cast a supported Meerschaum dtype to a Pandas dtype.
 51    """
 52    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
 53    if known_dtype is not None:
 54        return known_dtype
 55
 56    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
 57    if alias_dtype is not None:
 58        return MRSM_PD_DTYPES[alias_dtype]
 59
 60    if dtype.startswith('numeric'):
 61        return MRSM_PD_DTYPES['numeric']
 62
 63    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
 64    ### treat it as a SQL db type.
 65    if dtype.split(' ')[0].isupper():
 66        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
 67        return get_pd_type_from_db_type(dtype)
 68
 69    from meerschaum.utils.packages import attempt_import
 70    pandas = attempt_import('pandas', lazy=False)
 71
 72    try:
 73        return str(pandas.api.types.pandas_dtype(dtype))
 74    except Exception:
 75        warn(
 76            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
 77            + f"{traceback.format_exc()}",
 78            stack=False,
 79        )
 80    return 'object'
 81
 82
 83def are_dtypes_equal(
 84    ldtype: Union[str, Dict[str, str]],
 85    rdtype: Union[str, Dict[str, str]],
 86) -> bool:
 87    """
 88    Determine whether two dtype strings may be considered
 89    equivalent to avoid unnecessary conversions.
 90
 91    Parameters
 92    ----------
 93    ldtype: Union[str, Dict[str, str]]
 94        The left dtype to compare.
 95        May also provide a dtypes dictionary.
 96
 97    rdtype: Union[str, Dict[str, str]]
 98        The right dtype to compare.
 99        May also provide a dtypes dictionary.
100
101    Returns
102    -------
103    A `bool` indicating whether the two dtypes are to be considered equivalent.
104    """
105    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
106        lkeys = sorted([str(k) for k in ldtype.keys()])
107        rkeys = sorted([str(k) for k in rdtype.keys()])
108        for lkey, rkey in zip(lkeys, rkeys):
109            if lkey != rkey:
110                return False
111            ltype = ldtype[lkey]
112            rtype = rdtype[rkey]
113            if not are_dtypes_equal(ltype, rtype):
114                return False
115        return True
116
117    try:
118        if ldtype == rdtype:
119            return True
120    except Exception:
121        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
122        return False
123
124    ### Sometimes pandas dtype objects are passed.
125    ldtype = str(ldtype).split('[', maxsplit=1)[0]
126    rdtype = str(rdtype).split('[', maxsplit=1)[0]
127
128    if ldtype in MRSM_ALIAS_DTYPES:
129        ldtype = MRSM_ALIAS_DTYPES[ldtype]
130
131    if rdtype in MRSM_ALIAS_DTYPES:
132        rdtype = MRSM_ALIAS_DTYPES[rdtype]
133
134    json_dtypes = ('json', 'object')
135    if ldtype in json_dtypes and rdtype in json_dtypes:
136        return True
137
138    numeric_dtypes = ('numeric', 'object')
139    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
140        return True
141
142    uuid_dtypes = ('uuid', 'object')
143    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
144        return True
145
146    bytes_dtypes = ('bytes', 'object')
147    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
148        return True
149
150    if ldtype.lower() == rdtype.lower():
151        return True
152
153    datetime_dtypes = ('datetime', 'timestamp')
154    ldtype_found_dt_prefix = False
155    rdtype_found_dt_prefix = False
156    for dt_prefix in datetime_dtypes:
157        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
158        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
159    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
160        return True
161
162    string_dtypes = ('str', 'string', 'object')
163    if ldtype in string_dtypes and rdtype in string_dtypes:
164        return True
165
166    int_dtypes = ('int', 'int64', 'int32', 'int16', 'int8')
167    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
168        return True
169
170    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
171    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
172        return True
173
174    bool_dtypes = ('bool', 'boolean')
175    if ldtype in bool_dtypes and rdtype in bool_dtypes:
176        return True
177
178    return False
179
180
181def is_dtype_numeric(dtype: str) -> bool:
182    """
183    Determine whether a given `dtype` string
184    should be considered compatible with the Meerschaum dtype `numeric`.
185
186    Parameters
187    ----------
188    dtype: str
189        The pandas-like dtype string.
190
191    Returns
192    -------
193    A bool indicating the dtype is compatible with `numeric`.
194    """
195    dtype_lower = dtype.lower()
196
197    acceptable_substrings = ('numeric', 'float', 'double', 'int')
198    for substring in acceptable_substrings:
199        if substring in dtype_lower:
200            return True
201
202    return False
203
204
205def attempt_cast_to_numeric(
206    value: Any,
207    quantize: bool = False,
208    precision: Optional[int] = None,
209    scale: Optional[int] = None,
210)-> Any:
211    """
212    Given a value, attempt to coerce it into a numeric (Decimal).
213
214    Parameters
215    ----------
216    value: Any
217        The value to be cast to a Decimal.
218
219    quantize: bool, default False
220        If `True`, quantize the decimal to the specified precision and scale.
221
222    precision: Optional[int], default None
223        If `quantize` is `True`, use this precision.
224
225    scale: Optional[int], default None
226        If `quantize` is `True`, use this scale.
227
228    Returns
229    -------
230    A `Decimal` if possible, or `value`.
231    """
232    if isinstance(value, Decimal):
233        if quantize and precision and scale:
234            return quantize_decimal(value, precision, scale)
235        return value
236    try:
237        if value_is_null(value):
238            return Decimal('NaN')
239
240        dec = Decimal(str(value))
241        if not quantize or not precision or not scale:
242            return dec
243        return quantize_decimal(dec, precision, scale)
244    except Exception:
245        return value
246
247
248def attempt_cast_to_uuid(value: Any) -> Any:
249    """
250    Given a value, attempt to coerce it into a UUID (`uuid4`).
251    """
252    if isinstance(value, uuid.UUID):
253        return value
254    try:
255        return (
256            uuid.UUID(str(value))
257            if not value_is_null(value)
258            else None
259        )
260    except Exception:
261        return value
262
263
264def attempt_cast_to_bytes(value: Any) -> Any:
265    """
266    Given a value, attempt to coerce it into a bytestring.
267    """
268    if isinstance(value, bytes):
269        return value
270    try:
271        return (
272            deserialize_bytes_string(str(value))
273            if not value_is_null(value)
274            else None
275        )
276    except Exception:
277        return value
278
279
280def value_is_null(value: Any) -> bool:
281    """
282    Determine if a value is a null-like string.
283    """
284    return str(value).lower() in ('none', 'nan', 'na', 'nat', '', '<na>')
285
286
287def none_if_null(value: Any) -> Any:
288    """
289    Return `None` if a value is a null-like string.
290    """
291    return (None if value_is_null(value) else value)
292
293
294def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
295    """
296    Quantize a given `Decimal` to a known scale and precision.
297
298    Parameters
299    ----------
300    x: Decimal
301        The `Decimal` to be quantized.
302
303    precision: int
304        The total number of significant digits.
305
306    scale: int
307        The number of significant digits after the decimal point.
308
309    Returns
310    -------
311    A `Decimal` quantized to the specified scale and precision.
312    """
313    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
314    try:
315        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
316    except InvalidOperation:
317        pass
318
319    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
320
321
322def serialize_decimal(
323    x: Any,
324    quantize: bool = False,
325    precision: Optional[int] = None,
326    scale: Optional[int] = None,
327) -> Any:
328    """
329    Return a quantized string of an input decimal.
330
331    Parameters
332    ----------
333    x: Any
334        The potential decimal to be serialized.
335
336    quantize: bool, default False
337        If `True`, quantize the incoming Decimal to the specified scale and precision
338        before serialization.
339
340    precision: Optional[int], default None
341        The precision of the decimal to be quantized.
342
343    scale: Optional[int], default None
344        The scale of the decimal to be quantized.
345
346    Returns
347    -------
348    A string of the input decimal or the input if not a Decimal.
349    """
350    if not isinstance(x, Decimal):
351        return x
352
353    if value_is_null(x):
354        return None
355
356    if quantize and scale and precision:
357        x = quantize_decimal(x, precision, scale)
358
359    return f"{x:f}"
360
361
362def coerce_timezone(
363    dt: Any,
364    strip_utc: bool = False,
365) -> Any:
366    """
367    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
368    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
369    """
370    if dt is None:
371        return None
372
373    if isinstance(dt, int):
374        return dt
375
376    if isinstance(dt, str):
377        dateutil_parser = mrsm.attempt_import('dateutil.parser')
378        dt = dateutil_parser.parse(dt)
379
380    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
381
382    if dt_is_series:
383        pandas = mrsm.attempt_import('pandas', lazy=False)
384
385        if (
386            pandas.api.types.is_datetime64_any_dtype(dt) and (
387                (dt.dt.tz is not None and not strip_utc)
388                or
389                (dt.dt.tz is None and strip_utc)
390            )
391        ):
392            return dt
393
394        dt_series = to_datetime(dt, coerce_utc=False)
395        if strip_utc:
396            try:
397                if dt_series.dt.tz is not None:
398                    dt_series = dt_series.dt.tz_localize(None)
399            except Exception:
400                pass
401
402        return dt_series
403
404    if dt.tzinfo is None:
405        if strip_utc:
406            return dt
407        return dt.replace(tzinfo=timezone.utc)
408
409    utc_dt = dt.astimezone(timezone.utc)
410    if strip_utc:
411        return utc_dt.replace(tzinfo=None)
412    return utc_dt
413
414
415def to_datetime(dt_val: Any, as_pydatetime: bool = False, coerce_utc: bool = True) -> Any:
416    """
417    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
418    """
419    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
420    is_dask = 'dask' in getattr(dt_val, '__module__', '')
421    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
422    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
423    pd = pandas if dd is None else dd
424
425    try:
426        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
427        if as_pydatetime:
428            return new_dt_val.to_pydatetime()
429        return new_dt_val
430    except (pd.errors.OutOfBoundsDatetime, ValueError):
431        pass
432
433    def parse(x: Any) -> Any:
434        try:
435            return dateutil_parser.parse(x)
436        except Exception:
437            return x
438
439    if dt_is_series:
440        new_series = dt_val.apply(parse)
441        if coerce_utc:
442            return coerce_timezone(new_series)
443        return new_series
444
445    new_dt_val = parse(dt_val)
446    if not coerce_utc:
447        return new_dt_val
448    return coerce_timezone(new_dt_val)
449
450
451def serialize_bytes(data: bytes) -> str:
452    """
453    Return the given bytes as a base64-encoded string.
454    """
455    import base64
456    if not isinstance(data, bytes) and value_is_null(data):
457        return data
458    return base64.b64encode(data).decode('utf-8')
459
460
461def deserialize_bytes_string(data: str | None, force_hex: bool = False) -> bytes | None:
462    """
463    Given a serialized ASCII string of bytes data, return the original bytes.
464    The input data may either be base64- or hex-encoded.
465
466    Parameters
467    ----------
468    data: str | None
469        The string to be deserialized into bytes.
470        May be base64- or hex-encoded (prefixed with `'\\x'`).
471
472    force_hex: bool = False
473        If `True`, treat the input string as hex-encoded.
474        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
475        This will still strip the leading `'\\x'` prefix if present.
476
477    Returns
478    -------
479    The original bytes used to produce the encoded string `data`.
480    """
481    if not isinstance(data, str) and value_is_null(data):
482        return data
483
484    import binascii
485    import base64
486
487    is_hex = force_hex or data.startswith('\\x')
488
489    if is_hex:
490        if data.startswith('\\x'):
491            data = data[2:]
492        return binascii.unhexlify(data)
493
494    return base64.b64decode(data)
495
496
497def deserialize_base64(data: str) -> bytes:
498    """
499    Return the original bytestring from the given base64-encoded string.
500    """
501    import base64
502    return base64.b64decode(data)
503
504
505def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> str | None:
506    """
507    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
508    """
509    import binascii
510    if not isinstance(data, bytes) and value_is_null(data):
511        return data
512    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
513
514
515def serialize_datetime(dt: datetime) -> Union[str, None]:
516    """
517    Serialize a datetime object into JSON (ISO format string).
518
519    Examples
520    --------
521    >>> import json
522    >>> from datetime import datetime
523    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
524    '{"a": "2022-01-01T00:00:00Z"}'
525
526    """
527    if not isinstance(dt, datetime):
528        return None
529    tz_suffix = 'Z' if dt.tzinfo is None else ''
530    return dt.isoformat() + tz_suffix
531
532
533def json_serialize_value(x: Any, default_to_str: bool = True) -> str:
534    """
535    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
536
537    Parameters
538    ----------
539    x: Any
540        The value to serialize.
541
542    default_to_str: bool, default True
543        If `True`, return a string of `x` if x is not a designated type.
544        Otherwise return x.
545
546    Returns
547    -------
548    A serialized version of x, or x.
549    """
550    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
551        return x.meta
552
553    if hasattr(x, 'tzinfo'):
554        return serialize_datetime(x)
555
556    if isinstance(x, bytes):
557        return serialize_bytes(x)
558
559    if isinstance(x, Decimal):
560        return serialize_decimal(x)
561
562    if value_is_null(x):
563        return None
564
565    return str(x) if default_to_str else x
MRSM_ALIAS_DTYPES: Dict[str, str] = {'decimal': 'numeric', 'Decimal': 'numeric', 'number': 'numeric', 'jsonl': 'json', 'JSON': 'json', 'binary': 'bytes', 'blob': 'bytes', 'varbinary': 'bytes', 'bytea': 'bytes', 'guid': 'uuid', 'UUID': 'uuid'}
MRSM_PD_DTYPES: Dict[Optional[str], str] = {'json': 'object', 'numeric': 'object', 'uuid': 'object', 'datetime': 'datetime64[ns, UTC]', 'bool': 'bool[pyarrow]', 'int': 'Int64', 'int8': 'Int8', 'int16': 'Int16', 'int32': 'Int32', 'int64': 'Int64', 'str': 'string[python]', 'bytes': 'object', None: 'object'}
def to_pandas_dtype(dtype: str) -> str:
49def to_pandas_dtype(dtype: str) -> str:
50    """
51    Cast a supported Meerschaum dtype to a Pandas dtype.
52    """
53    known_dtype = MRSM_PD_DTYPES.get(dtype, None)
54    if known_dtype is not None:
55        return known_dtype
56
57    alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None)
58    if alias_dtype is not None:
59        return MRSM_PD_DTYPES[alias_dtype]
60
61    if dtype.startswith('numeric'):
62        return MRSM_PD_DTYPES['numeric']
63
64    ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps,
65    ### treat it as a SQL db type.
66    if dtype.split(' ')[0].isupper():
67        from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
68        return get_pd_type_from_db_type(dtype)
69
70    from meerschaum.utils.packages import attempt_import
71    pandas = attempt_import('pandas', lazy=False)
72
73    try:
74        return str(pandas.api.types.pandas_dtype(dtype))
75    except Exception:
76        warn(
77            f"Invalid dtype '{dtype}', will use 'object' instead:\n"
78            + f"{traceback.format_exc()}",
79            stack=False,
80        )
81    return 'object'

Cast a supported Meerschaum dtype to a Pandas dtype.

def are_dtypes_equal( ldtype: Union[str, Dict[str, str]], rdtype: Union[str, Dict[str, str]]) -> bool:
 84def are_dtypes_equal(
 85    ldtype: Union[str, Dict[str, str]],
 86    rdtype: Union[str, Dict[str, str]],
 87) -> bool:
 88    """
 89    Determine whether two dtype strings may be considered
 90    equivalent to avoid unnecessary conversions.
 91
 92    Parameters
 93    ----------
 94    ldtype: Union[str, Dict[str, str]]
 95        The left dtype to compare.
 96        May also provide a dtypes dictionary.
 97
 98    rdtype: Union[str, Dict[str, str]]
 99        The right dtype to compare.
100        May also provide a dtypes dictionary.
101
102    Returns
103    -------
104    A `bool` indicating whether the two dtypes are to be considered equivalent.
105    """
106    if isinstance(ldtype, dict) and isinstance(rdtype, dict):
107        lkeys = sorted([str(k) for k in ldtype.keys()])
108        rkeys = sorted([str(k) for k in rdtype.keys()])
109        for lkey, rkey in zip(lkeys, rkeys):
110            if lkey != rkey:
111                return False
112            ltype = ldtype[lkey]
113            rtype = rdtype[rkey]
114            if not are_dtypes_equal(ltype, rtype):
115                return False
116        return True
117
118    try:
119        if ldtype == rdtype:
120            return True
121    except Exception:
122        warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}")
123        return False
124
125    ### Sometimes pandas dtype objects are passed.
126    ldtype = str(ldtype).split('[', maxsplit=1)[0]
127    rdtype = str(rdtype).split('[', maxsplit=1)[0]
128
129    if ldtype in MRSM_ALIAS_DTYPES:
130        ldtype = MRSM_ALIAS_DTYPES[ldtype]
131
132    if rdtype in MRSM_ALIAS_DTYPES:
133        rdtype = MRSM_ALIAS_DTYPES[rdtype]
134
135    json_dtypes = ('json', 'object')
136    if ldtype in json_dtypes and rdtype in json_dtypes:
137        return True
138
139    numeric_dtypes = ('numeric', 'object')
140    if ldtype in numeric_dtypes and rdtype in numeric_dtypes:
141        return True
142
143    uuid_dtypes = ('uuid', 'object')
144    if ldtype in uuid_dtypes and rdtype in uuid_dtypes:
145        return True
146
147    bytes_dtypes = ('bytes', 'object')
148    if ldtype in bytes_dtypes and rdtype in bytes_dtypes:
149        return True
150
151    if ldtype.lower() == rdtype.lower():
152        return True
153
154    datetime_dtypes = ('datetime', 'timestamp')
155    ldtype_found_dt_prefix = False
156    rdtype_found_dt_prefix = False
157    for dt_prefix in datetime_dtypes:
158        ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix
159        rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix
160    if ldtype_found_dt_prefix and rdtype_found_dt_prefix:
161        return True
162
163    string_dtypes = ('str', 'string', 'object')
164    if ldtype in string_dtypes and rdtype in string_dtypes:
165        return True
166
167    int_dtypes = ('int', 'int64', 'int32', 'int16', 'int8')
168    if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes:
169        return True
170
171    float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double')
172    if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes:
173        return True
174
175    bool_dtypes = ('bool', 'boolean')
176    if ldtype in bool_dtypes and rdtype in bool_dtypes:
177        return True
178
179    return False

Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.

Parameters
  • ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
  • rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
  • A bool indicating whether the two dtypes are to be considered equivalent.
def is_dtype_numeric(dtype: str) -> bool:
182def is_dtype_numeric(dtype: str) -> bool:
183    """
184    Determine whether a given `dtype` string
185    should be considered compatible with the Meerschaum dtype `numeric`.
186
187    Parameters
188    ----------
189    dtype: str
190        The pandas-like dtype string.
191
192    Returns
193    -------
194    A bool indicating the dtype is compatible with `numeric`.
195    """
196    dtype_lower = dtype.lower()
197
198    acceptable_substrings = ('numeric', 'float', 'double', 'int')
199    for substring in acceptable_substrings:
200        if substring in dtype_lower:
201            return True
202
203    return False

Determine whether a given dtype string should be considered compatible with the Meerschaum dtype numeric.

Parameters
  • dtype (str): The pandas-like dtype string.
Returns
  • A bool indicating the dtype is compatible with numeric.
def attempt_cast_to_numeric( value: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
206def attempt_cast_to_numeric(
207    value: Any,
208    quantize: bool = False,
209    precision: Optional[int] = None,
210    scale: Optional[int] = None,
211)-> Any:
212    """
213    Given a value, attempt to coerce it into a numeric (Decimal).
214
215    Parameters
216    ----------
217    value: Any
218        The value to be cast to a Decimal.
219
220    quantize: bool, default False
221        If `True`, quantize the decimal to the specified precision and scale.
222
223    precision: Optional[int], default None
224        If `quantize` is `True`, use this precision.
225
226    scale: Optional[int], default None
227        If `quantize` is `True`, use this scale.
228
229    Returns
230    -------
231    A `Decimal` if possible, or `value`.
232    """
233    if isinstance(value, Decimal):
234        if quantize and precision and scale:
235            return quantize_decimal(value, precision, scale)
236        return value
237    try:
238        if value_is_null(value):
239            return Decimal('NaN')
240
241        dec = Decimal(str(value))
242        if not quantize or not precision or not scale:
243            return dec
244        return quantize_decimal(dec, precision, scale)
245    except Exception:
246        return value

Given a value, attempt to coerce it into a numeric (Decimal).

Parameters
  • value (Any): The value to be cast to a Decimal.
  • quantize (bool, default False): If True, quantize the decimal to the specified precision and scale.
  • precision (Optional[int], default None): If quantize is True, use this precision.
  • scale (Optional[int], default None): If quantize is True, use this scale.
Returns
  • A Decimal if possible, or value.
def attempt_cast_to_uuid(value: Any) -> Any:
249def attempt_cast_to_uuid(value: Any) -> Any:
250    """
251    Given a value, attempt to coerce it into a UUID (`uuid4`).
252    """
253    if isinstance(value, uuid.UUID):
254        return value
255    try:
256        return (
257            uuid.UUID(str(value))
258            if not value_is_null(value)
259            else None
260        )
261    except Exception:
262        return value

Given a value, attempt to coerce it into a UUID (uuid4).

def attempt_cast_to_bytes(value: Any) -> Any:
265def attempt_cast_to_bytes(value: Any) -> Any:
266    """
267    Given a value, attempt to coerce it into a bytestring.
268    """
269    if isinstance(value, bytes):
270        return value
271    try:
272        return (
273            deserialize_bytes_string(str(value))
274            if not value_is_null(value)
275            else None
276        )
277    except Exception:
278        return value

Given a value, attempt to coerce it into a bytestring.

def value_is_null(value: Any) -> bool:
281def value_is_null(value: Any) -> bool:
282    """
283    Determine if a value is a null-like string.
284    """
285    return str(value).lower() in ('none', 'nan', 'na', 'nat', '', '<na>')

Determine if a value is a null-like string.

def none_if_null(value: Any) -> Any:
288def none_if_null(value: Any) -> Any:
289    """
290    Return `None` if a value is a null-like string.
291    """
292    return (None if value_is_null(value) else value)

Return None if a value is a null-like string.

def quantize_decimal(x: decimal.Decimal, precision: int, scale: int) -> decimal.Decimal:
295def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal:
296    """
297    Quantize a given `Decimal` to a known scale and precision.
298
299    Parameters
300    ----------
301    x: Decimal
302        The `Decimal` to be quantized.
303
304    precision: int
305        The total number of significant digits.
306
307    scale: int
308        The number of significant digits after the decimal point.
309
310    Returns
311    -------
312    A `Decimal` quantized to the specified scale and precision.
313    """
314    precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale))
315    try:
316        return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP)
317    except InvalidOperation:
318        pass
319
320    raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")

Quantize a given Decimal to a known scale and precision.

Parameters
  • x (Decimal): The Decimal to be quantized.
  • precision (int): The total number of significant digits.
  • scale (int): The number of significant digits after the decimal point.
Returns
  • A Decimal quantized to the specified scale and precision.
def serialize_decimal( x: Any, quantize: bool = False, precision: Optional[int] = None, scale: Optional[int] = None) -> Any:
323def serialize_decimal(
324    x: Any,
325    quantize: bool = False,
326    precision: Optional[int] = None,
327    scale: Optional[int] = None,
328) -> Any:
329    """
330    Return a quantized string of an input decimal.
331
332    Parameters
333    ----------
334    x: Any
335        The potential decimal to be serialized.
336
337    quantize: bool, default False
338        If `True`, quantize the incoming Decimal to the specified scale and precision
339        before serialization.
340
341    precision: Optional[int], default None
342        The precision of the decimal to be quantized.
343
344    scale: Optional[int], default None
345        The scale of the decimal to be quantized.
346
347    Returns
348    -------
349    A string of the input decimal or the input if not a Decimal.
350    """
351    if not isinstance(x, Decimal):
352        return x
353
354    if value_is_null(x):
355        return None
356
357    if quantize and scale and precision:
358        x = quantize_decimal(x, precision, scale)
359
360    return f"{x:f}"

Return a quantized string of an input decimal.

Parameters
  • x (Any): The potential decimal to be serialized.
  • quantize (bool, default False): If True, quantize the incoming Decimal to the specified scale and precision before serialization.
  • precision (Optional[int], default None): The precision of the decimal to be quantized.
  • scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
  • A string of the input decimal or the input if not a Decimal.
def coerce_timezone(dt: Any, strip_utc: bool = False) -> Any:
363def coerce_timezone(
364    dt: Any,
365    strip_utc: bool = False,
366) -> Any:
367    """
368    Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`,
369    return a UTC timestamp (strip timezone if `strip_utc` is `True`.
370    """
371    if dt is None:
372        return None
373
374    if isinstance(dt, int):
375        return dt
376
377    if isinstance(dt, str):
378        dateutil_parser = mrsm.attempt_import('dateutil.parser')
379        dt = dateutil_parser.parse(dt)
380
381    dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__')
382
383    if dt_is_series:
384        pandas = mrsm.attempt_import('pandas', lazy=False)
385
386        if (
387            pandas.api.types.is_datetime64_any_dtype(dt) and (
388                (dt.dt.tz is not None and not strip_utc)
389                or
390                (dt.dt.tz is None and strip_utc)
391            )
392        ):
393            return dt
394
395        dt_series = to_datetime(dt, coerce_utc=False)
396        if strip_utc:
397            try:
398                if dt_series.dt.tz is not None:
399                    dt_series = dt_series.dt.tz_localize(None)
400            except Exception:
401                pass
402
403        return dt_series
404
405    if dt.tzinfo is None:
406        if strip_utc:
407            return dt
408        return dt.replace(tzinfo=timezone.utc)
409
410    utc_dt = dt.astimezone(timezone.utc)
411    if strip_utc:
412        return utc_dt.replace(tzinfo=None)
413    return utc_dt

Given a datetime, pandas Timestamp or Series of Timestamp, return a UTC timestamp (strip timezone if strip_utc is True.

def to_datetime(dt_val: Any, as_pydatetime: bool = False, coerce_utc: bool = True) -> Any:
416def to_datetime(dt_val: Any, as_pydatetime: bool = False, coerce_utc: bool = True) -> Any:
417    """
418    Wrap `pd.to_datetime()` and add support for out-of-bounds values.
419    """
420    pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False)
421    is_dask = 'dask' in getattr(dt_val, '__module__', '')
422    dd = mrsm.attempt_import('dask.dataframe') if is_dask else None
423    dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__')
424    pd = pandas if dd is None else dd
425
426    try:
427        new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601')
428        if as_pydatetime:
429            return new_dt_val.to_pydatetime()
430        return new_dt_val
431    except (pd.errors.OutOfBoundsDatetime, ValueError):
432        pass
433
434    def parse(x: Any) -> Any:
435        try:
436            return dateutil_parser.parse(x)
437        except Exception:
438            return x
439
440    if dt_is_series:
441        new_series = dt_val.apply(parse)
442        if coerce_utc:
443            return coerce_timezone(new_series)
444        return new_series
445
446    new_dt_val = parse(dt_val)
447    if not coerce_utc:
448        return new_dt_val
449    return coerce_timezone(new_dt_val)

Wrap pd.to_datetime() and add support for out-of-bounds values.

def serialize_bytes(data: bytes) -> str:
452def serialize_bytes(data: bytes) -> str:
453    """
454    Return the given bytes as a base64-encoded string.
455    """
456    import base64
457    if not isinstance(data, bytes) and value_is_null(data):
458        return data
459    return base64.b64encode(data).decode('utf-8')

Return the given bytes as a base64-encoded string.

def deserialize_bytes_string(data: str | None, force_hex: bool = False) -> bytes | None:
462def deserialize_bytes_string(data: str | None, force_hex: bool = False) -> bytes | None:
463    """
464    Given a serialized ASCII string of bytes data, return the original bytes.
465    The input data may either be base64- or hex-encoded.
466
467    Parameters
468    ----------
469    data: str | None
470        The string to be deserialized into bytes.
471        May be base64- or hex-encoded (prefixed with `'\\x'`).
472
473    force_hex: bool = False
474        If `True`, treat the input string as hex-encoded.
475        If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`.
476        This will still strip the leading `'\\x'` prefix if present.
477
478    Returns
479    -------
480    The original bytes used to produce the encoded string `data`.
481    """
482    if not isinstance(data, str) and value_is_null(data):
483        return data
484
485    import binascii
486    import base64
487
488    is_hex = force_hex or data.startswith('\\x')
489
490    if is_hex:
491        if data.startswith('\\x'):
492            data = data[2:]
493        return binascii.unhexlify(data)
494
495    return base64.b64decode(data)

Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.

Parameters
  • data (str | None): The string to be deserialized into bytes. May be base64- or hex-encoded (prefixed with '\x').
  • force_hex (bool = False): If True, treat the input string as hex-encoded. If data does not begin with the prefix '\x', set force_hex to True. This will still strip the leading '\x' prefix if present.
Returns
  • The original bytes used to produce the encoded string data.
def deserialize_base64(data: str) -> bytes:
498def deserialize_base64(data: str) -> bytes:
499    """
500    Return the original bytestring from the given base64-encoded string.
501    """
502    import base64
503    return base64.b64decode(data)

Return the original bytestring from the given base64-encoded string.

def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> str | None:
506def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> str | None:
507    """
508    Return the given bytes as a hex string for PostgreSQL's `BYTEA` type.
509    """
510    import binascii
511    if not isinstance(data, bytes) and value_is_null(data):
512        return data
513    return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')

Return the given bytes as a hex string for PostgreSQL's BYTEA type.

def serialize_datetime(dt: datetime.datetime) -> Optional[str]:
516def serialize_datetime(dt: datetime) -> Union[str, None]:
517    """
518    Serialize a datetime object into JSON (ISO format string).
519
520    Examples
521    --------
522    >>> import json
523    >>> from datetime import datetime
524    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
525    '{"a": "2022-01-01T00:00:00Z"}'
526
527    """
528    if not isinstance(dt, datetime):
529        return None
530    tz_suffix = 'Z' if dt.tzinfo is None else ''
531    return dt.isoformat() + tz_suffix

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
def json_serialize_value(x: Any, default_to_str: bool = True) -> str:
534def json_serialize_value(x: Any, default_to_str: bool = True) -> str:
535    """
536    Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
537
538    Parameters
539    ----------
540    x: Any
541        The value to serialize.
542
543    default_to_str: bool, default True
544        If `True`, return a string of `x` if x is not a designated type.
545        Otherwise return x.
546
547    Returns
548    -------
549    A serialized version of x, or x.
550    """
551    if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)):
552        return x.meta
553
554    if hasattr(x, 'tzinfo'):
555        return serialize_datetime(x)
556
557    if isinstance(x, bytes):
558        return serialize_bytes(x)
559
560    if isinstance(x, Decimal):
561        return serialize_decimal(x)
562
563    if value_is_null(x):
564        return None
565
566    return str(x) if default_to_str else x

Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.

Parameters
  • x (Any): The value to serialize.
  • default_to_str (bool, default True): If True, return a string of x if x is not a designated type. Otherwise return x.
Returns
  • A serialized version of x, or x.