meerschaum.utils.dtypes
Utility functions for working with data types.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with data types. 7""" 8 9import traceback 10import uuid 11from datetime import timezone, datetime 12from decimal import Decimal, Context, InvalidOperation, ROUND_HALF_UP 13 14import meerschaum as mrsm 15from meerschaum.utils.typing import Dict, Union, Any, Optional 16from meerschaum.utils.warnings import warn 17 18MRSM_ALIAS_DTYPES: Dict[str, str] = { 19 'decimal': 'numeric', 20 'Decimal': 'numeric', 21 'number': 'numeric', 22 'jsonl': 'json', 23 'JSON': 'json', 24 'binary': 'bytes', 25 'blob': 'bytes', 26 'varbinary': 'bytes', 27 'bytea': 'bytes', 28 'guid': 'uuid', 29 'UUID': 'uuid', 30} 31MRSM_PD_DTYPES: Dict[Union[str, None], str] = { 32 'json': 'object', 33 'numeric': 'object', 34 'uuid': 'object', 35 'datetime': 'datetime64[ns, UTC]', 36 'bool': 'bool[pyarrow]', 37 'int': 'Int64', 38 'int8': 'Int8', 39 'int16': 'Int16', 40 'int32': 'Int32', 41 'int64': 'Int64', 42 'str': 'string[python]', 43 'bytes': 'object', 44 None: 'object', 45} 46 47 48def to_pandas_dtype(dtype: str) -> str: 49 """ 50 Cast a supported Meerschaum dtype to a Pandas dtype. 51 """ 52 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 53 if known_dtype is not None: 54 return known_dtype 55 56 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 57 if alias_dtype is not None: 58 return MRSM_PD_DTYPES[alias_dtype] 59 60 if dtype.startswith('numeric'): 61 return MRSM_PD_DTYPES['numeric'] 62 63 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 64 ### treat it as a SQL db type. 65 if dtype.split(' ')[0].isupper(): 66 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 67 return get_pd_type_from_db_type(dtype) 68 69 from meerschaum.utils.packages import attempt_import 70 pandas = attempt_import('pandas', lazy=False) 71 72 try: 73 return str(pandas.api.types.pandas_dtype(dtype)) 74 except Exception: 75 warn( 76 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 77 + f"{traceback.format_exc()}", 78 stack=False, 79 ) 80 return 'object' 81 82 83def are_dtypes_equal( 84 ldtype: Union[str, Dict[str, str]], 85 rdtype: Union[str, Dict[str, str]], 86) -> bool: 87 """ 88 Determine whether two dtype strings may be considered 89 equivalent to avoid unnecessary conversions. 90 91 Parameters 92 ---------- 93 ldtype: Union[str, Dict[str, str]] 94 The left dtype to compare. 95 May also provide a dtypes dictionary. 96 97 rdtype: Union[str, Dict[str, str]] 98 The right dtype to compare. 99 May also provide a dtypes dictionary. 100 101 Returns 102 ------- 103 A `bool` indicating whether the two dtypes are to be considered equivalent. 104 """ 105 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 106 lkeys = sorted([str(k) for k in ldtype.keys()]) 107 rkeys = sorted([str(k) for k in rdtype.keys()]) 108 for lkey, rkey in zip(lkeys, rkeys): 109 if lkey != rkey: 110 return False 111 ltype = ldtype[lkey] 112 rtype = rdtype[rkey] 113 if not are_dtypes_equal(ltype, rtype): 114 return False 115 return True 116 117 try: 118 if ldtype == rdtype: 119 return True 120 except Exception: 121 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 122 return False 123 124 ### Sometimes pandas dtype objects are passed. 125 ldtype = str(ldtype).split('[', maxsplit=1)[0] 126 rdtype = str(rdtype).split('[', maxsplit=1)[0] 127 128 if ldtype in MRSM_ALIAS_DTYPES: 129 ldtype = MRSM_ALIAS_DTYPES[ldtype] 130 131 if rdtype in MRSM_ALIAS_DTYPES: 132 rdtype = MRSM_ALIAS_DTYPES[rdtype] 133 134 json_dtypes = ('json', 'object') 135 if ldtype in json_dtypes and rdtype in json_dtypes: 136 return True 137 138 numeric_dtypes = ('numeric', 'object') 139 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 140 return True 141 142 uuid_dtypes = ('uuid', 'object') 143 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 144 return True 145 146 bytes_dtypes = ('bytes', 'object') 147 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 148 return True 149 150 if ldtype.lower() == rdtype.lower(): 151 return True 152 153 datetime_dtypes = ('datetime', 'timestamp') 154 ldtype_found_dt_prefix = False 155 rdtype_found_dt_prefix = False 156 for dt_prefix in datetime_dtypes: 157 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 158 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 159 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 160 return True 161 162 string_dtypes = ('str', 'string', 'object') 163 if ldtype in string_dtypes and rdtype in string_dtypes: 164 return True 165 166 int_dtypes = ('int', 'int64', 'int32', 'int16', 'int8') 167 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 168 return True 169 170 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 171 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 172 return True 173 174 bool_dtypes = ('bool', 'boolean') 175 if ldtype in bool_dtypes and rdtype in bool_dtypes: 176 return True 177 178 return False 179 180 181def is_dtype_numeric(dtype: str) -> bool: 182 """ 183 Determine whether a given `dtype` string 184 should be considered compatible with the Meerschaum dtype `numeric`. 185 186 Parameters 187 ---------- 188 dtype: str 189 The pandas-like dtype string. 190 191 Returns 192 ------- 193 A bool indicating the dtype is compatible with `numeric`. 194 """ 195 dtype_lower = dtype.lower() 196 197 acceptable_substrings = ('numeric', 'float', 'double', 'int') 198 for substring in acceptable_substrings: 199 if substring in dtype_lower: 200 return True 201 202 return False 203 204 205def attempt_cast_to_numeric( 206 value: Any, 207 quantize: bool = False, 208 precision: Optional[int] = None, 209 scale: Optional[int] = None, 210)-> Any: 211 """ 212 Given a value, attempt to coerce it into a numeric (Decimal). 213 214 Parameters 215 ---------- 216 value: Any 217 The value to be cast to a Decimal. 218 219 quantize: bool, default False 220 If `True`, quantize the decimal to the specified precision and scale. 221 222 precision: Optional[int], default None 223 If `quantize` is `True`, use this precision. 224 225 scale: Optional[int], default None 226 If `quantize` is `True`, use this scale. 227 228 Returns 229 ------- 230 A `Decimal` if possible, or `value`. 231 """ 232 if isinstance(value, Decimal): 233 if quantize and precision and scale: 234 return quantize_decimal(value, precision, scale) 235 return value 236 try: 237 if value_is_null(value): 238 return Decimal('NaN') 239 240 dec = Decimal(str(value)) 241 if not quantize or not precision or not scale: 242 return dec 243 return quantize_decimal(dec, precision, scale) 244 except Exception: 245 return value 246 247 248def attempt_cast_to_uuid(value: Any) -> Any: 249 """ 250 Given a value, attempt to coerce it into a UUID (`uuid4`). 251 """ 252 if isinstance(value, uuid.UUID): 253 return value 254 try: 255 return ( 256 uuid.UUID(str(value)) 257 if not value_is_null(value) 258 else None 259 ) 260 except Exception: 261 return value 262 263 264def attempt_cast_to_bytes(value: Any) -> Any: 265 """ 266 Given a value, attempt to coerce it into a bytestring. 267 """ 268 if isinstance(value, bytes): 269 return value 270 try: 271 return ( 272 deserialize_bytes_string(str(value)) 273 if not value_is_null(value) 274 else None 275 ) 276 except Exception: 277 return value 278 279 280def value_is_null(value: Any) -> bool: 281 """ 282 Determine if a value is a null-like string. 283 """ 284 return str(value).lower() in ('none', 'nan', 'na', 'nat', '', '<na>') 285 286 287def none_if_null(value: Any) -> Any: 288 """ 289 Return `None` if a value is a null-like string. 290 """ 291 return (None if value_is_null(value) else value) 292 293 294def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 295 """ 296 Quantize a given `Decimal` to a known scale and precision. 297 298 Parameters 299 ---------- 300 x: Decimal 301 The `Decimal` to be quantized. 302 303 precision: int 304 The total number of significant digits. 305 306 scale: int 307 The number of significant digits after the decimal point. 308 309 Returns 310 ------- 311 A `Decimal` quantized to the specified scale and precision. 312 """ 313 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 314 try: 315 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 316 except InvalidOperation: 317 pass 318 319 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.") 320 321 322def serialize_decimal( 323 x: Any, 324 quantize: bool = False, 325 precision: Optional[int] = None, 326 scale: Optional[int] = None, 327) -> Any: 328 """ 329 Return a quantized string of an input decimal. 330 331 Parameters 332 ---------- 333 x: Any 334 The potential decimal to be serialized. 335 336 quantize: bool, default False 337 If `True`, quantize the incoming Decimal to the specified scale and precision 338 before serialization. 339 340 precision: Optional[int], default None 341 The precision of the decimal to be quantized. 342 343 scale: Optional[int], default None 344 The scale of the decimal to be quantized. 345 346 Returns 347 ------- 348 A string of the input decimal or the input if not a Decimal. 349 """ 350 if not isinstance(x, Decimal): 351 return x 352 353 if value_is_null(x): 354 return None 355 356 if quantize and scale and precision: 357 x = quantize_decimal(x, precision, scale) 358 359 return f"{x:f}" 360 361 362def coerce_timezone( 363 dt: Any, 364 strip_utc: bool = False, 365) -> Any: 366 """ 367 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 368 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 369 """ 370 if dt is None: 371 return None 372 373 if isinstance(dt, int): 374 return dt 375 376 if isinstance(dt, str): 377 dateutil_parser = mrsm.attempt_import('dateutil.parser') 378 dt = dateutil_parser.parse(dt) 379 380 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 381 382 if dt_is_series: 383 pandas = mrsm.attempt_import('pandas', lazy=False) 384 385 if ( 386 pandas.api.types.is_datetime64_any_dtype(dt) and ( 387 (dt.dt.tz is not None and not strip_utc) 388 or 389 (dt.dt.tz is None and strip_utc) 390 ) 391 ): 392 return dt 393 394 dt_series = to_datetime(dt, coerce_utc=False) 395 if strip_utc: 396 try: 397 if dt_series.dt.tz is not None: 398 dt_series = dt_series.dt.tz_localize(None) 399 except Exception: 400 pass 401 402 return dt_series 403 404 if dt.tzinfo is None: 405 if strip_utc: 406 return dt 407 return dt.replace(tzinfo=timezone.utc) 408 409 utc_dt = dt.astimezone(timezone.utc) 410 if strip_utc: 411 return utc_dt.replace(tzinfo=None) 412 return utc_dt 413 414 415def to_datetime(dt_val: Any, as_pydatetime: bool = False, coerce_utc: bool = True) -> Any: 416 """ 417 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 418 """ 419 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 420 is_dask = 'dask' in getattr(dt_val, '__module__', '') 421 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 422 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 423 pd = pandas if dd is None else dd 424 425 try: 426 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 427 if as_pydatetime: 428 return new_dt_val.to_pydatetime() 429 return new_dt_val 430 except (pd.errors.OutOfBoundsDatetime, ValueError): 431 pass 432 433 def parse(x: Any) -> Any: 434 try: 435 return dateutil_parser.parse(x) 436 except Exception: 437 return x 438 439 if dt_is_series: 440 new_series = dt_val.apply(parse) 441 if coerce_utc: 442 return coerce_timezone(new_series) 443 return new_series 444 445 new_dt_val = parse(dt_val) 446 if not coerce_utc: 447 return new_dt_val 448 return coerce_timezone(new_dt_val) 449 450 451def serialize_bytes(data: bytes) -> str: 452 """ 453 Return the given bytes as a base64-encoded string. 454 """ 455 import base64 456 if not isinstance(data, bytes) and value_is_null(data): 457 return data 458 return base64.b64encode(data).decode('utf-8') 459 460 461def deserialize_bytes_string(data: str | None, force_hex: bool = False) -> bytes | None: 462 """ 463 Given a serialized ASCII string of bytes data, return the original bytes. 464 The input data may either be base64- or hex-encoded. 465 466 Parameters 467 ---------- 468 data: str | None 469 The string to be deserialized into bytes. 470 May be base64- or hex-encoded (prefixed with `'\\x'`). 471 472 force_hex: bool = False 473 If `True`, treat the input string as hex-encoded. 474 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 475 This will still strip the leading `'\\x'` prefix if present. 476 477 Returns 478 ------- 479 The original bytes used to produce the encoded string `data`. 480 """ 481 if not isinstance(data, str) and value_is_null(data): 482 return data 483 484 import binascii 485 import base64 486 487 is_hex = force_hex or data.startswith('\\x') 488 489 if is_hex: 490 if data.startswith('\\x'): 491 data = data[2:] 492 return binascii.unhexlify(data) 493 494 return base64.b64decode(data) 495 496 497def deserialize_base64(data: str) -> bytes: 498 """ 499 Return the original bytestring from the given base64-encoded string. 500 """ 501 import base64 502 return base64.b64decode(data) 503 504 505def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> str | None: 506 """ 507 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 508 """ 509 import binascii 510 if not isinstance(data, bytes) and value_is_null(data): 511 return data 512 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8') 513 514 515def serialize_datetime(dt: datetime) -> Union[str, None]: 516 """ 517 Serialize a datetime object into JSON (ISO format string). 518 519 Examples 520 -------- 521 >>> import json 522 >>> from datetime import datetime 523 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 524 '{"a": "2022-01-01T00:00:00Z"}' 525 526 """ 527 if not isinstance(dt, datetime): 528 return None 529 tz_suffix = 'Z' if dt.tzinfo is None else '' 530 return dt.isoformat() + tz_suffix 531 532 533def json_serialize_value(x: Any, default_to_str: bool = True) -> str: 534 """ 535 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 536 537 Parameters 538 ---------- 539 x: Any 540 The value to serialize. 541 542 default_to_str: bool, default True 543 If `True`, return a string of `x` if x is not a designated type. 544 Otherwise return x. 545 546 Returns 547 ------- 548 A serialized version of x, or x. 549 """ 550 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 551 return x.meta 552 553 if hasattr(x, 'tzinfo'): 554 return serialize_datetime(x) 555 556 if isinstance(x, bytes): 557 return serialize_bytes(x) 558 559 if isinstance(x, Decimal): 560 return serialize_decimal(x) 561 562 if value_is_null(x): 563 return None 564 565 return str(x) if default_to_str else x
49def to_pandas_dtype(dtype: str) -> str: 50 """ 51 Cast a supported Meerschaum dtype to a Pandas dtype. 52 """ 53 known_dtype = MRSM_PD_DTYPES.get(dtype, None) 54 if known_dtype is not None: 55 return known_dtype 56 57 alias_dtype = MRSM_ALIAS_DTYPES.get(dtype, None) 58 if alias_dtype is not None: 59 return MRSM_PD_DTYPES[alias_dtype] 60 61 if dtype.startswith('numeric'): 62 return MRSM_PD_DTYPES['numeric'] 63 64 ### NOTE: Kind of a hack, but if the first word of the given dtype is in all caps, 65 ### treat it as a SQL db type. 66 if dtype.split(' ')[0].isupper(): 67 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 68 return get_pd_type_from_db_type(dtype) 69 70 from meerschaum.utils.packages import attempt_import 71 pandas = attempt_import('pandas', lazy=False) 72 73 try: 74 return str(pandas.api.types.pandas_dtype(dtype)) 75 except Exception: 76 warn( 77 f"Invalid dtype '{dtype}', will use 'object' instead:\n" 78 + f"{traceback.format_exc()}", 79 stack=False, 80 ) 81 return 'object'
Cast a supported Meerschaum dtype to a Pandas dtype.
84def are_dtypes_equal( 85 ldtype: Union[str, Dict[str, str]], 86 rdtype: Union[str, Dict[str, str]], 87) -> bool: 88 """ 89 Determine whether two dtype strings may be considered 90 equivalent to avoid unnecessary conversions. 91 92 Parameters 93 ---------- 94 ldtype: Union[str, Dict[str, str]] 95 The left dtype to compare. 96 May also provide a dtypes dictionary. 97 98 rdtype: Union[str, Dict[str, str]] 99 The right dtype to compare. 100 May also provide a dtypes dictionary. 101 102 Returns 103 ------- 104 A `bool` indicating whether the two dtypes are to be considered equivalent. 105 """ 106 if isinstance(ldtype, dict) and isinstance(rdtype, dict): 107 lkeys = sorted([str(k) for k in ldtype.keys()]) 108 rkeys = sorted([str(k) for k in rdtype.keys()]) 109 for lkey, rkey in zip(lkeys, rkeys): 110 if lkey != rkey: 111 return False 112 ltype = ldtype[lkey] 113 rtype = rdtype[rkey] 114 if not are_dtypes_equal(ltype, rtype): 115 return False 116 return True 117 118 try: 119 if ldtype == rdtype: 120 return True 121 except Exception: 122 warn(f"Exception when comparing dtypes, returning False:\n{traceback.format_exc()}") 123 return False 124 125 ### Sometimes pandas dtype objects are passed. 126 ldtype = str(ldtype).split('[', maxsplit=1)[0] 127 rdtype = str(rdtype).split('[', maxsplit=1)[0] 128 129 if ldtype in MRSM_ALIAS_DTYPES: 130 ldtype = MRSM_ALIAS_DTYPES[ldtype] 131 132 if rdtype in MRSM_ALIAS_DTYPES: 133 rdtype = MRSM_ALIAS_DTYPES[rdtype] 134 135 json_dtypes = ('json', 'object') 136 if ldtype in json_dtypes and rdtype in json_dtypes: 137 return True 138 139 numeric_dtypes = ('numeric', 'object') 140 if ldtype in numeric_dtypes and rdtype in numeric_dtypes: 141 return True 142 143 uuid_dtypes = ('uuid', 'object') 144 if ldtype in uuid_dtypes and rdtype in uuid_dtypes: 145 return True 146 147 bytes_dtypes = ('bytes', 'object') 148 if ldtype in bytes_dtypes and rdtype in bytes_dtypes: 149 return True 150 151 if ldtype.lower() == rdtype.lower(): 152 return True 153 154 datetime_dtypes = ('datetime', 'timestamp') 155 ldtype_found_dt_prefix = False 156 rdtype_found_dt_prefix = False 157 for dt_prefix in datetime_dtypes: 158 ldtype_found_dt_prefix = (dt_prefix in ldtype.lower()) or ldtype_found_dt_prefix 159 rdtype_found_dt_prefix = (dt_prefix in rdtype.lower()) or rdtype_found_dt_prefix 160 if ldtype_found_dt_prefix and rdtype_found_dt_prefix: 161 return True 162 163 string_dtypes = ('str', 'string', 'object') 164 if ldtype in string_dtypes and rdtype in string_dtypes: 165 return True 166 167 int_dtypes = ('int', 'int64', 'int32', 'int16', 'int8') 168 if ldtype.lower() in int_dtypes and rdtype.lower() in int_dtypes: 169 return True 170 171 float_dtypes = ('float', 'float64', 'float32', 'float16', 'float128', 'double') 172 if ldtype.lower() in float_dtypes and rdtype.lower() in float_dtypes: 173 return True 174 175 bool_dtypes = ('bool', 'boolean') 176 if ldtype in bool_dtypes and rdtype in bool_dtypes: 177 return True 178 179 return False
Determine whether two dtype strings may be considered equivalent to avoid unnecessary conversions.
Parameters
- ldtype (Union[str, Dict[str, str]]): The left dtype to compare. May also provide a dtypes dictionary.
- rdtype (Union[str, Dict[str, str]]): The right dtype to compare. May also provide a dtypes dictionary.
Returns
- A
bool
indicating whether the two dtypes are to be considered equivalent.
182def is_dtype_numeric(dtype: str) -> bool: 183 """ 184 Determine whether a given `dtype` string 185 should be considered compatible with the Meerschaum dtype `numeric`. 186 187 Parameters 188 ---------- 189 dtype: str 190 The pandas-like dtype string. 191 192 Returns 193 ------- 194 A bool indicating the dtype is compatible with `numeric`. 195 """ 196 dtype_lower = dtype.lower() 197 198 acceptable_substrings = ('numeric', 'float', 'double', 'int') 199 for substring in acceptable_substrings: 200 if substring in dtype_lower: 201 return True 202 203 return False
Determine whether a given dtype
string
should be considered compatible with the Meerschaum dtype numeric
.
Parameters
- dtype (str): The pandas-like dtype string.
Returns
- A bool indicating the dtype is compatible with
numeric
.
206def attempt_cast_to_numeric( 207 value: Any, 208 quantize: bool = False, 209 precision: Optional[int] = None, 210 scale: Optional[int] = None, 211)-> Any: 212 """ 213 Given a value, attempt to coerce it into a numeric (Decimal). 214 215 Parameters 216 ---------- 217 value: Any 218 The value to be cast to a Decimal. 219 220 quantize: bool, default False 221 If `True`, quantize the decimal to the specified precision and scale. 222 223 precision: Optional[int], default None 224 If `quantize` is `True`, use this precision. 225 226 scale: Optional[int], default None 227 If `quantize` is `True`, use this scale. 228 229 Returns 230 ------- 231 A `Decimal` if possible, or `value`. 232 """ 233 if isinstance(value, Decimal): 234 if quantize and precision and scale: 235 return quantize_decimal(value, precision, scale) 236 return value 237 try: 238 if value_is_null(value): 239 return Decimal('NaN') 240 241 dec = Decimal(str(value)) 242 if not quantize or not precision or not scale: 243 return dec 244 return quantize_decimal(dec, precision, scale) 245 except Exception: 246 return value
Given a value, attempt to coerce it into a numeric (Decimal).
Parameters
- value (Any): The value to be cast to a Decimal.
- quantize (bool, default False):
If
True
, quantize the decimal to the specified precision and scale. - precision (Optional[int], default None):
If
quantize
isTrue
, use this precision. - scale (Optional[int], default None):
If
quantize
isTrue
, use this scale.
Returns
- A
Decimal
if possible, orvalue
.
249def attempt_cast_to_uuid(value: Any) -> Any: 250 """ 251 Given a value, attempt to coerce it into a UUID (`uuid4`). 252 """ 253 if isinstance(value, uuid.UUID): 254 return value 255 try: 256 return ( 257 uuid.UUID(str(value)) 258 if not value_is_null(value) 259 else None 260 ) 261 except Exception: 262 return value
Given a value, attempt to coerce it into a UUID (uuid4
).
265def attempt_cast_to_bytes(value: Any) -> Any: 266 """ 267 Given a value, attempt to coerce it into a bytestring. 268 """ 269 if isinstance(value, bytes): 270 return value 271 try: 272 return ( 273 deserialize_bytes_string(str(value)) 274 if not value_is_null(value) 275 else None 276 ) 277 except Exception: 278 return value
Given a value, attempt to coerce it into a bytestring.
281def value_is_null(value: Any) -> bool: 282 """ 283 Determine if a value is a null-like string. 284 """ 285 return str(value).lower() in ('none', 'nan', 'na', 'nat', '', '<na>')
Determine if a value is a null-like string.
288def none_if_null(value: Any) -> Any: 289 """ 290 Return `None` if a value is a null-like string. 291 """ 292 return (None if value_is_null(value) else value)
Return None
if a value is a null-like string.
295def quantize_decimal(x: Decimal, precision: int, scale: int) -> Decimal: 296 """ 297 Quantize a given `Decimal` to a known scale and precision. 298 299 Parameters 300 ---------- 301 x: Decimal 302 The `Decimal` to be quantized. 303 304 precision: int 305 The total number of significant digits. 306 307 scale: int 308 The number of significant digits after the decimal point. 309 310 Returns 311 ------- 312 A `Decimal` quantized to the specified scale and precision. 313 """ 314 precision_decimal = Decimal(('1' * (precision - scale)) + '.' + ('1' * scale)) 315 try: 316 return x.quantize(precision_decimal, context=Context(prec=precision), rounding=ROUND_HALF_UP) 317 except InvalidOperation: 318 pass 319 320 raise ValueError(f"Cannot quantize value '{x}' to {precision=}, {scale=}.")
Quantize a given Decimal
to a known scale and precision.
Parameters
- x (Decimal):
The
Decimal
to be quantized. - precision (int): The total number of significant digits.
- scale (int): The number of significant digits after the decimal point.
Returns
- A
Decimal
quantized to the specified scale and precision.
323def serialize_decimal( 324 x: Any, 325 quantize: bool = False, 326 precision: Optional[int] = None, 327 scale: Optional[int] = None, 328) -> Any: 329 """ 330 Return a quantized string of an input decimal. 331 332 Parameters 333 ---------- 334 x: Any 335 The potential decimal to be serialized. 336 337 quantize: bool, default False 338 If `True`, quantize the incoming Decimal to the specified scale and precision 339 before serialization. 340 341 precision: Optional[int], default None 342 The precision of the decimal to be quantized. 343 344 scale: Optional[int], default None 345 The scale of the decimal to be quantized. 346 347 Returns 348 ------- 349 A string of the input decimal or the input if not a Decimal. 350 """ 351 if not isinstance(x, Decimal): 352 return x 353 354 if value_is_null(x): 355 return None 356 357 if quantize and scale and precision: 358 x = quantize_decimal(x, precision, scale) 359 360 return f"{x:f}"
Return a quantized string of an input decimal.
Parameters
- x (Any): The potential decimal to be serialized.
- quantize (bool, default False):
If
True
, quantize the incoming Decimal to the specified scale and precision before serialization. - precision (Optional[int], default None): The precision of the decimal to be quantized.
- scale (Optional[int], default None): The scale of the decimal to be quantized.
Returns
- A string of the input decimal or the input if not a Decimal.
363def coerce_timezone( 364 dt: Any, 365 strip_utc: bool = False, 366) -> Any: 367 """ 368 Given a `datetime`, pandas `Timestamp` or `Series` of `Timestamp`, 369 return a UTC timestamp (strip timezone if `strip_utc` is `True`. 370 """ 371 if dt is None: 372 return None 373 374 if isinstance(dt, int): 375 return dt 376 377 if isinstance(dt, str): 378 dateutil_parser = mrsm.attempt_import('dateutil.parser') 379 dt = dateutil_parser.parse(dt) 380 381 dt_is_series = hasattr(dt, 'dtype') and hasattr(dt, '__module__') 382 383 if dt_is_series: 384 pandas = mrsm.attempt_import('pandas', lazy=False) 385 386 if ( 387 pandas.api.types.is_datetime64_any_dtype(dt) and ( 388 (dt.dt.tz is not None and not strip_utc) 389 or 390 (dt.dt.tz is None and strip_utc) 391 ) 392 ): 393 return dt 394 395 dt_series = to_datetime(dt, coerce_utc=False) 396 if strip_utc: 397 try: 398 if dt_series.dt.tz is not None: 399 dt_series = dt_series.dt.tz_localize(None) 400 except Exception: 401 pass 402 403 return dt_series 404 405 if dt.tzinfo is None: 406 if strip_utc: 407 return dt 408 return dt.replace(tzinfo=timezone.utc) 409 410 utc_dt = dt.astimezone(timezone.utc) 411 if strip_utc: 412 return utc_dt.replace(tzinfo=None) 413 return utc_dt
Given a datetime
, pandas Timestamp
or Series
of Timestamp
,
return a UTC timestamp (strip timezone if strip_utc
is True
.
416def to_datetime(dt_val: Any, as_pydatetime: bool = False, coerce_utc: bool = True) -> Any: 417 """ 418 Wrap `pd.to_datetime()` and add support for out-of-bounds values. 419 """ 420 pandas, dateutil_parser = mrsm.attempt_import('pandas', 'dateutil.parser', lazy=False) 421 is_dask = 'dask' in getattr(dt_val, '__module__', '') 422 dd = mrsm.attempt_import('dask.dataframe') if is_dask else None 423 dt_is_series = hasattr(dt_val, 'dtype') and hasattr(dt_val, '__module__') 424 pd = pandas if dd is None else dd 425 426 try: 427 new_dt_val = pd.to_datetime(dt_val, utc=True, format='ISO8601') 428 if as_pydatetime: 429 return new_dt_val.to_pydatetime() 430 return new_dt_val 431 except (pd.errors.OutOfBoundsDatetime, ValueError): 432 pass 433 434 def parse(x: Any) -> Any: 435 try: 436 return dateutil_parser.parse(x) 437 except Exception: 438 return x 439 440 if dt_is_series: 441 new_series = dt_val.apply(parse) 442 if coerce_utc: 443 return coerce_timezone(new_series) 444 return new_series 445 446 new_dt_val = parse(dt_val) 447 if not coerce_utc: 448 return new_dt_val 449 return coerce_timezone(new_dt_val)
Wrap pd.to_datetime()
and add support for out-of-bounds values.
452def serialize_bytes(data: bytes) -> str: 453 """ 454 Return the given bytes as a base64-encoded string. 455 """ 456 import base64 457 if not isinstance(data, bytes) and value_is_null(data): 458 return data 459 return base64.b64encode(data).decode('utf-8')
Return the given bytes as a base64-encoded string.
462def deserialize_bytes_string(data: str | None, force_hex: bool = False) -> bytes | None: 463 """ 464 Given a serialized ASCII string of bytes data, return the original bytes. 465 The input data may either be base64- or hex-encoded. 466 467 Parameters 468 ---------- 469 data: str | None 470 The string to be deserialized into bytes. 471 May be base64- or hex-encoded (prefixed with `'\\x'`). 472 473 force_hex: bool = False 474 If `True`, treat the input string as hex-encoded. 475 If `data` does not begin with the prefix `'\\x'`, set `force_hex` to `True`. 476 This will still strip the leading `'\\x'` prefix if present. 477 478 Returns 479 ------- 480 The original bytes used to produce the encoded string `data`. 481 """ 482 if not isinstance(data, str) and value_is_null(data): 483 return data 484 485 import binascii 486 import base64 487 488 is_hex = force_hex or data.startswith('\\x') 489 490 if is_hex: 491 if data.startswith('\\x'): 492 data = data[2:] 493 return binascii.unhexlify(data) 494 495 return base64.b64decode(data)
Given a serialized ASCII string of bytes data, return the original bytes. The input data may either be base64- or hex-encoded.
Parameters
- data (str | None):
The string to be deserialized into bytes.
May be base64- or hex-encoded (prefixed with
'\x'
). - force_hex (bool = False):
If
True
, treat the input string as hex-encoded. Ifdata
does not begin with the prefix'\x'
, setforce_hex
toTrue
. This will still strip the leading'\x'
prefix if present.
Returns
- The original bytes used to produce the encoded string
data
.
498def deserialize_base64(data: str) -> bytes: 499 """ 500 Return the original bytestring from the given base64-encoded string. 501 """ 502 import base64 503 return base64.b64decode(data)
Return the original bytestring from the given base64-encoded string.
506def encode_bytes_for_bytea(data: bytes, with_prefix: bool = True) -> str | None: 507 """ 508 Return the given bytes as a hex string for PostgreSQL's `BYTEA` type. 509 """ 510 import binascii 511 if not isinstance(data, bytes) and value_is_null(data): 512 return data 513 return ('\\x' if with_prefix else '') + binascii.hexlify(data).decode('utf-8')
Return the given bytes as a hex string for PostgreSQL's BYTEA
type.
516def serialize_datetime(dt: datetime) -> Union[str, None]: 517 """ 518 Serialize a datetime object into JSON (ISO format string). 519 520 Examples 521 -------- 522 >>> import json 523 >>> from datetime import datetime 524 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 525 '{"a": "2022-01-01T00:00:00Z"}' 526 527 """ 528 if not isinstance(dt, datetime): 529 return None 530 tz_suffix = 'Z' if dt.tzinfo is None else '' 531 return dt.isoformat() + tz_suffix
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
534def json_serialize_value(x: Any, default_to_str: bool = True) -> str: 535 """ 536 Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc. 537 538 Parameters 539 ---------- 540 x: Any 541 The value to serialize. 542 543 default_to_str: bool, default True 544 If `True`, return a string of `x` if x is not a designated type. 545 Otherwise return x. 546 547 Returns 548 ------- 549 A serialized version of x, or x. 550 """ 551 if isinstance(x, (mrsm.Pipe, mrsm.connectors.Connector)): 552 return x.meta 553 554 if hasattr(x, 'tzinfo'): 555 return serialize_datetime(x) 556 557 if isinstance(x, bytes): 558 return serialize_bytes(x) 559 560 if isinstance(x, Decimal): 561 return serialize_decimal(x) 562 563 if value_is_null(x): 564 return None 565 566 return str(x) if default_to_str else x
Serialize the given value to a JSON value. Accounts for datetimes, bytes, decimals, etc.
Parameters
- x (Any): The value to serialize.
- default_to_str (bool, default True):
If
True
, return a string ofx
if x is not a designated type. Otherwise return x.
Returns
- A serialized version of x, or x.