meerschaum.utils.misc

Miscellaneous functions go here

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4"""
   5Miscellaneous functions go here
   6"""
   7
   8from __future__ import annotations
   9
  10import os
  11import sys
  12import functools
  13import pathlib
  14from datetime import timedelta, datetime
  15
  16from meerschaum.utils.typing import (
  17    Union,
  18    Any,
  19    Callable,
  20    Optional,
  21    List,
  22    Dict,
  23    SuccessTuple,
  24    Iterable,
  25    PipesDict,
  26    Tuple,
  27    TYPE_CHECKING,
  28)
  29if TYPE_CHECKING:
  30    import collections
  31
  32__pdoc__: Dict[str, bool] = {
  33    'to_pandas_dtype': False,
  34    'filter_unseen_df': False,
  35    'add_missing_cols_to_df': False,
  36    'parse_df_datetimes': False,
  37    'df_from_literal': False,
  38    'get_json_cols': False,
  39    'get_unhashable_cols': False,
  40    'enforce_dtypes': False,
  41    'get_datetime_bound_from_df': False,
  42    'df_is_chunk_generator': False,
  43    'choices_docstring': False,
  44    '_get_subaction_names': False,
  45    'is_pipe_registered': False,
  46    'replace_pipes_in_dict': False,
  47    'round_time': False,
  48}
  49
  50
  51def add_method_to_class(
  52    func: Callable[[Any], Any],
  53    class_def: 'Class',
  54    method_name: Optional[str] = None,
  55    keep_self: Optional[bool] = None,
  56) -> Callable[[Any], Any]:
  57    """
  58    Add function `func` to class `class_def`.
  59
  60    Parameters
  61    ----------
  62    func: Callable[[Any], Any]
  63        Function to be added as a method of the class
  64
  65    class_def: Class
  66        Class to be modified.
  67
  68    method_name: Optional[str], default None
  69        New name of the method. None will use func.__name__ (default).
  70
  71    Returns
  72    -------
  73    The modified function object.
  74
  75    """
  76    from functools import wraps
  77
  78    is_class = isinstance(class_def, type)
  79    
  80    @wraps(func)
  81    def wrapper(self, *args, **kw):
  82        return func(*args, **kw)
  83
  84    if method_name is None:
  85        method_name = func.__name__
  86
  87    setattr(class_def, method_name, (
  88            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
  89        )
  90    )
  91
  92    return func
  93
  94
  95def generate_password(length: int = 12) -> str:
  96    """
  97    Generate a secure password of given length.
  98
  99    Parameters
 100    ----------
 101    length: int, default 12
 102        The length of the password.
 103
 104    Returns
 105    -------
 106    A random password string.
 107    """
 108    import secrets
 109    import string
 110    return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))
 111 
 112
 113def is_int(s: str) -> bool:
 114    """
 115    Check if string is an int.
 116
 117    Parameters
 118    ----------
 119    s: str
 120        The string to be checked.
 121        
 122    Returns
 123    -------
 124    A bool indicating whether the string was able to be cast to an integer.
 125
 126    """
 127    try:
 128        return float(s).is_integer()
 129    except Exception:
 130        return False
 131
 132
 133def is_uuid(s: str) -> bool:
 134    """
 135    Check if a string is a valid UUID.
 136
 137    Parameters
 138    ----------
 139    s: str
 140        The string to be checked.
 141
 142    Returns
 143    -------
 144    A bool indicating whether the string is a valid UUID.
 145    """
 146    import uuid
 147    try:
 148        uuid.UUID(str(s))
 149        return True
 150    except Exception:
 151        return False
 152
 153
 154def string_to_dict(params_string: str) -> Dict[str, Any]:
 155    """
 156    Parse a string into a dictionary.
 157    
 158    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
 159
 160    Parameters
 161    ----------
 162    params_string: str
 163        The string to be parsed.
 164        
 165    Returns
 166    -------
 167    The parsed dictionary.
 168
 169    Examples
 170    --------
 171    >>> string_to_dict("a:1,b:2") 
 172    {'a': 1, 'b': 2}
 173    >>> string_to_dict('{"a": 1, "b": 2}')
 174    {'a': 1, 'b': 2}
 175
 176    """
 177    if not params_string:
 178        return {}
 179
 180    import json
 181
 182    ### Kind of a weird edge case.
 183    ### In the generated compose file, there is some weird escaping happening,
 184    ### so the string to be parsed starts and ends with a single quote.
 185    if (
 186        isinstance(params_string, str)
 187        and len(params_string) > 4
 188        and params_string[1] == "{"
 189        and params_string[-2] == "}"
 190    ):
 191        return json.loads(params_string[1:-1])
 192
 193    if str(params_string).startswith('{'):
 194        return json.loads(params_string)
 195
 196    import ast
 197    params_dict = {}
 198    
 199    items = []
 200    bracket_level = 0
 201    brace_level = 0
 202    current_item = ''
 203    in_quotes = False
 204    quote_char = ''
 205    
 206    i = 0
 207    while i < len(params_string):
 208        char = params_string[i]
 209        
 210        if in_quotes:
 211            if char == quote_char and (i == 0 or params_string[i-1] != '\\'):
 212                in_quotes = False
 213        else:
 214            if char in ('"', "'"):
 215                in_quotes = True
 216                quote_char = char
 217            elif char == '[':
 218                bracket_level += 1
 219            elif char == ']':
 220                bracket_level -= 1
 221            elif char == '{':
 222                brace_level += 1
 223            elif char == '}':
 224                brace_level -= 1
 225            elif char == ',' and bracket_level == 0 and brace_level == 0:
 226                items.append(current_item)
 227                current_item = ''
 228                i += 1
 229                continue
 230        
 231        current_item += char
 232        i += 1
 233        
 234    if current_item:
 235        items.append(current_item)
 236
 237    for param in items:
 238        param = param.strip()
 239        if not param:
 240            continue
 241
 242        _keys = param.split(":", maxsplit=1)
 243        if len(_keys) != 2:
 244            continue
 245
 246        keys = _keys[:-1]
 247        try:
 248            val = ast.literal_eval(_keys[-1])
 249        except Exception:
 250            val = str(_keys[-1])
 251
 252        c = params_dict
 253        for _k in keys[:-1]:
 254            try:
 255                k = ast.literal_eval(_k)
 256            except Exception:
 257                k = str(_k)
 258            if k not in c:
 259                c[k] = {}
 260            c = c[k]
 261
 262        c[keys[-1]] = val
 263
 264    return params_dict
 265
 266
 267def to_simple_dict(doc: Dict[str, Any]) -> str:
 268    """
 269    Serialize a document dictionary in simple-dict format.
 270    """
 271    import json
 272    import ast
 273    from meerschaum.utils.dtypes import json_serialize_value
 274
 275    def serialize_value(value):
 276        if isinstance(value, str):
 277            try:
 278                evaluated = ast.literal_eval(value)
 279                if not isinstance(evaluated, str):
 280                    return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
 281                return value
 282            except (ValueError, SyntaxError, TypeError, MemoryError):
 283                return value
 284        
 285        return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
 286
 287    return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items())
 288
 289
 290def parse_config_substitution(
 291    value: str,
 292    leading_key: str = 'MRSM',
 293    begin_key: str = '{',
 294    end_key: str = '}',
 295    delimeter: str = ':',
 296) -> List[Any]:
 297    """
 298    Parse Meerschaum substitution syntax
 299    E.g. MRSM{value1:value2} => ['value1', 'value2']
 300    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
 301    """
 302    if not value.beginswith(leading_key):
 303        return value
 304
 305    return leading_key[len(leading_key):][len():-1].split(delimeter)
 306
 307
 308def edit_file(
 309    path: Union[pathlib.Path, str],
 310    default_editor: str = 'pyvim',
 311    debug: bool = False
 312) -> bool:
 313    """
 314    Open a file for editing.
 315
 316    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
 317
 318    Parameters
 319    ----------
 320    path: Union[pathlib.Path, str]
 321        The path to the file to be edited.
 322
 323    default_editor: str, default 'pyvim'
 324        If `$EDITOR` is not set, use this instead.
 325        If `pyvim` is not installed, it will install it from PyPI.
 326
 327    debug: bool, default False
 328        Verbosity toggle.
 329
 330    Returns
 331    -------
 332    A bool indicating the file was successfully edited.
 333    """
 334    from subprocess import call
 335    from meerschaum.utils.debug import dprint
 336    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
 337    try:
 338        EDITOR = os.environ.get('EDITOR', default_editor)
 339        if debug:
 340            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
 341        rc = call([EDITOR, path])
 342    except Exception as e: ### can't open with default editors
 343        if debug:
 344            dprint(str(e))
 345            dprint('Failed to open file with system editor. Falling back to pyvim...')
 346        pyvim = attempt_import('pyvim', lazy=False)
 347        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
 348    return rc == 0
 349
 350
 351def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
 352    """
 353    Determine the columns and lines in the terminal.
 354    If they cannot be determined, return the default values (100 columns and 120 lines).
 355
 356    Parameters
 357    ----------
 358    default_cols: int, default 100
 359        If the columns cannot be determined, return this value.
 360
 361    default_lines: int, default 120
 362        If the lines cannot be determined, return this value.
 363
 364    Returns
 365    -------
 366    A tuple if integers for the columns and lines.
 367    """
 368    try:
 369        size = os.get_terminal_size()
 370        _cols, _lines = size.columns, size.lines
 371    except Exception:
 372        _cols, _lines = (
 373            int(os.environ.get('COLUMNS', str(default_cols))),
 374            int(os.environ.get('LINES', str(default_lines))),
 375        )
 376    return _cols, _lines
 377
 378
 379def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
 380    """
 381    Iterate over a list in chunks.
 382    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
 383
 384    Parameters
 385    ----------
 386    iterable: Iterable[Any]
 387        The iterable to iterate over in chunks.
 388        
 389    chunksize: int
 390        The size of chunks to iterate with.
 391        
 392    fillvalue: Optional[Any], default None
 393        If the chunks do not evenly divide into the iterable, pad the end with this value.
 394
 395    Returns
 396    -------
 397    A generator of tuples of size `chunksize`.
 398
 399    """
 400    from itertools import zip_longest
 401    args = [iter(iterable)] * chunksize
 402    return zip_longest(*args, fillvalue=fillvalue)
 403
 404def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
 405    """
 406    Sort a dictionary's values and return a new dictionary.
 407
 408    Parameters
 409    ----------
 410    d: Dict[Any, Any]
 411        The dictionary to be sorted.
 412
 413    Returns
 414    -------
 415    A sorted dictionary.
 416
 417    Examples
 418    --------
 419    >>> sorted_dict({'b': 1, 'a': 2})
 420    {'b': 1, 'a': 2}
 421    >>> sorted_dict({'b': 2, 'a': 1})
 422    {'a': 1, 'b': 2}
 423
 424    """
 425    try:
 426        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
 427    except Exception:
 428        return d
 429
 430def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
 431    """
 432    Convert the standard pipes dictionary into a list.
 433
 434    Parameters
 435    ----------
 436    pipes_dict: PipesDict
 437        The pipes dictionary to be flattened.
 438
 439    Returns
 440    -------
 441    A list of `Pipe` objects.
 442
 443    """
 444    pipes_list = []
 445    for ck in pipes_dict.values():
 446        for mk in ck.values():
 447            pipes_list += list(mk.values())
 448    return pipes_list
 449
 450
 451def timed_input(
 452    seconds: int = 10,
 453    timeout_message: str = "",
 454    prompt: str = "",
 455    icon: bool = False,
 456    **kw
 457) -> Union[str, None]:
 458    """
 459    Accept user input only for a brief period of time.
 460
 461    Parameters
 462    ----------
 463    seconds: int, default 10
 464        The number of seconds to wait.
 465
 466    timeout_message: str, default ''
 467        The message to print after the window has elapsed.
 468
 469    prompt: str, default ''
 470        The prompt to print during the window.
 471
 472    icon: bool, default False
 473        If `True`, print the configured input icon.
 474
 475
 476    Returns
 477    -------
 478    The input string entered by the user.
 479
 480    """
 481    import signal, time
 482
 483    class TimeoutExpired(Exception):
 484        """Raise this exception when the timeout is reached."""
 485
 486    def alarm_handler(signum, frame):
 487        raise TimeoutExpired
 488
 489    # set signal handler
 490    signal.signal(signal.SIGALRM, alarm_handler)
 491    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
 492
 493    try:
 494        return input(prompt)
 495    except TimeoutExpired:
 496        return None
 497    except (EOFError, RuntimeError):
 498        try:
 499            print(prompt)
 500            time.sleep(seconds)
 501        except TimeoutExpired:
 502            return None
 503    finally:
 504        signal.alarm(0) # cancel alarm
 505
 506
 507def enforce_gevent_monkey_patch():
 508    """
 509    Check if gevent monkey patching is enabled, and if not, then apply patching.
 510    """
 511    from meerschaum.utils.packages import attempt_import
 512    import socket
 513    gevent, gevent_socket, gevent_monkey = attempt_import(
 514        'gevent', 'gevent.socket', 'gevent.monkey'
 515    )
 516    if not socket.socket is gevent_socket.socket:
 517        gevent_monkey.patch_all()
 518
 519def is_valid_email(email: str) -> Union['re.Match', None]:
 520    """
 521    Check whether a string is a valid email.
 522
 523    Parameters
 524    ----------
 525    email: str
 526        The string to be examined.
 527        
 528    Returns
 529    -------
 530    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
 531
 532    Examples
 533    --------
 534    >>> is_valid_email('foo')
 535    >>> is_valid_email('foo@foo.com')
 536    <re.Match object; span=(0, 11), match='foo@foo.com'>
 537
 538    """
 539    import re
 540    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
 541    return re.search(regex, email)
 542
 543
 544def string_width(string: str, widest: bool = True) -> int:
 545    """
 546    Calculate the width of a string, either by its widest or last line.
 547
 548    Parameters
 549    ----------
 550    string: str:
 551        The string to be examined.
 552        
 553    widest: bool, default True
 554        No longer used because `widest` is always assumed to be true.
 555
 556    Returns
 557    -------
 558    An integer for the text's visual width.
 559
 560    Examples
 561    --------
 562    >>> string_width('a')
 563    1
 564    >>> string_width('a\\nbc\\nd')
 565    2
 566
 567    """
 568    def _widest():
 569        words = string.split('\n')
 570        max_length = 0
 571        for w in words:
 572            length = len(w)
 573            if length > max_length:
 574                max_length = length
 575        return max_length
 576
 577    return _widest()
 578
 579def _pyinstaller_traverse_dir(
 580    directory: str,
 581    ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
 582    include_dotfiles: bool = False
 583) -> list:
 584    """
 585    Recursively traverse a directory and return a list of its contents.
 586    """
 587    paths = []
 588    _directory = pathlib.Path(directory)
 589
 590    def _found_pattern(name: str):
 591        for pattern in ignore_patterns:
 592            if pattern.replace('/', os.path.sep) in str(name):
 593                return True
 594        return False
 595
 596    for root, dirs, files in os.walk(_directory):
 597        _root = str(root)[len(str(_directory.parent)):]
 598        if _root.startswith(os.path.sep):
 599            _root = _root[len(os.path.sep):]
 600        if _root.startswith('.') and not include_dotfiles:
 601            continue
 602        ### ignore certain patterns
 603        if _found_pattern(_root):
 604            continue
 605
 606        for filename in files:
 607            if filename.startswith('.') and not include_dotfiles:
 608                continue
 609            path = os.path.join(root, filename)
 610            if _found_pattern(path):
 611                continue
 612
 613            _path = str(path)[len(str(_directory.parent)):]
 614            if _path.startswith(os.path.sep):
 615                _path = _path[len(os.path.sep):]
 616            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
 617
 618            paths.append((path, _path))
 619    return paths
 620
 621
 622def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any:
 623    """
 624    Get a value from a dictionary with a tuple of keys.
 625
 626    Parameters
 627    ----------
 628    d: Dict[Any, Any]
 629        The dictionary to search.
 630
 631    path: Tuple[Any, ...]
 632        The path of keys to traverse.
 633
 634    Returns
 635    -------
 636    The value from the end of the path.
 637    """
 638    return functools.reduce(lambda di, key: di[key], path, d)
 639
 640
 641def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None:
 642    """
 643    Set a value in a dictionary with a tuple of keys.
 644
 645    Parameters
 646    ----------
 647    d: Dict[Any, Any]
 648        The dictionary to search.
 649
 650    path: Tuple[Any, ...]
 651        The path of keys to traverse.
 652
 653    val: Any
 654        The value to set at the end of the path.
 655    """
 656    get_val_from_dict_path(d, path[:-1])[path[-1]] = val
 657
 658
 659def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
 660    """
 661    Recursively replace passwords in a dictionary.
 662
 663    Parameters
 664    ----------
 665    d: Dict[str, Any]
 666        The dictionary to search through.
 667
 668    replace_with: str, default '*'
 669        The string to replace each character of the password with.
 670
 671    Returns
 672    -------
 673    Another dictionary where values to the keys `'password'`
 674    are replaced with `replace_with` (`'*'`).
 675
 676    Examples
 677    --------
 678    >>> replace_password({'a': 1})
 679    {'a': 1}
 680    >>> replace_password({'password': '123'})
 681    {'password': '***'}
 682    >>> replace_password({'nested': {'password': '123'}})
 683    {'nested': {'password': '***'}}
 684    >>> replace_password({'password': '123'}, replace_with='!')
 685    {'password': '!!!'}
 686
 687    """
 688    import copy
 689    _d = copy.deepcopy(d)
 690    for k, v in d.items():
 691        if isinstance(v, dict):
 692            _d[k] = replace_password(v)
 693        elif 'password' in str(k).lower():
 694            _d[k] = ''.join([replace_with for char in str(v)])
 695        elif str(k).lower() == 'uri':
 696            from meerschaum.connectors.sql import SQLConnector
 697            try:
 698                uri_params = SQLConnector.parse_uri(v)
 699            except Exception:
 700                uri_params = None
 701            if not uri_params:
 702                continue
 703            if not 'username' in uri_params or not 'password' in uri_params:
 704                continue
 705            _d[k] = v.replace(
 706                uri_params['username'] + ':' + uri_params['password'],
 707                uri_params['username'] + ':' + ''.join(
 708                    [replace_with for char in str(uri_params['password'])]
 709                )
 710            )
 711    return _d
 712
 713
 714def filter_arguments(
 715    func: Callable[[Any], Any],
 716    *args: Any,
 717    **kwargs: Any
 718) -> Tuple[Tuple[Any], Dict[str, Any]]:
 719    """
 720    Filter out unsupported positional and keyword arguments.
 721
 722    Parameters
 723    ----------
 724    func: Callable[[Any], Any]
 725        The function to inspect.
 726
 727    *args: Any
 728        Positional arguments to filter and pass to `func`.
 729
 730    **kwargs
 731        Keyword arguments to filter and pass to `func`.
 732
 733    Returns
 734    -------
 735    The `args` and `kwargs` accepted by `func`.
 736    """
 737    args = filter_positionals(func, *args)
 738    kwargs = filter_keywords(func, **kwargs)
 739    return args, kwargs
 740
 741
 742def filter_keywords(
 743    func: Callable[[Any], Any],
 744    **kw: Any
 745) -> Dict[str, Any]:
 746    """
 747    Filter out unsupported keyword arguments.
 748
 749    Parameters
 750    ----------
 751    func: Callable[[Any], Any]
 752        The function to inspect.
 753
 754    **kw: Any
 755        The arguments to be filtered and passed into `func`.
 756
 757    Returns
 758    -------
 759    A dictionary of keyword arguments accepted by `func`.
 760
 761    Examples
 762    --------
 763    ```python
 764    >>> def foo(a=1, b=2):
 765    ...     return a * b
 766    >>> filter_keywords(foo, a=2, b=4, c=6)
 767    {'a': 2, 'b': 4}
 768    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
 769    8
 770    ```
 771
 772    """
 773    import inspect
 774    func_params = inspect.signature(func).parameters
 775    ### If the function has a **kw method, skip filtering.
 776    for param, _type in func_params.items():
 777        if '**' in str(_type):
 778            return kw
 779    return {k: v for k, v in kw.items() if k in func_params}
 780
 781
 782def filter_positionals(
 783    func: Callable[[Any], Any],
 784    *args: Any
 785) -> Tuple[Any]:
 786    """
 787    Filter out unsupported positional arguments.
 788
 789    Parameters
 790    ----------
 791    func: Callable[[Any], Any]
 792        The function to inspect.
 793
 794    *args: Any
 795        The arguments to be filtered and passed into `func`.
 796        NOTE: If the function signature expects more arguments than provided,
 797        the missing slots will be filled with `None`.
 798
 799    Returns
 800    -------
 801    A tuple of positional arguments accepted by `func`.
 802
 803    Examples
 804    --------
 805    ```python
 806    >>> def foo(a, b):
 807    ...     return a * b
 808    >>> filter_positionals(foo, 2, 4, 6)
 809    (2, 4)
 810    >>> foo(*filter_positionals(foo, 2, 4, 6))
 811    8
 812    ```
 813
 814    """
 815    import inspect
 816    from meerschaum.utils.warnings import warn
 817    func_params = inspect.signature(func).parameters
 818    acceptable_args: List[Any] = []
 819
 820    def _warn_invalids(_num_invalid):
 821        if _num_invalid > 0:
 822            warn(
 823                "Too few arguments were provided. "
 824                + f"{_num_invalid} argument"
 825                + ('s have ' if _num_invalid != 1 else " has ")
 826                + " been filled with `None`.",
 827            )
 828
 829    num_invalid: int = 0
 830    for i, (param, val) in enumerate(func_params.items()):
 831        if '=' in str(val) or '*' in str(val):
 832            _warn_invalids(num_invalid)
 833            return tuple(acceptable_args)
 834
 835        try:
 836            acceptable_args.append(args[i])
 837        except IndexError:
 838            acceptable_args.append(None)
 839            num_invalid += 1
 840
 841    _warn_invalids(num_invalid)
 842    return tuple(acceptable_args)
 843
 844
 845def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
 846    """
 847    Convert an ordered dict to a dict.
 848    Does not mutate the original OrderedDict.
 849    """
 850    from collections import OrderedDict
 851    _d = dict(od)
 852    for k, v in od.items():
 853        if isinstance(v, OrderedDict) or (
 854            issubclass(type(v), OrderedDict)
 855        ):
 856            _d[k] = dict_from_od(v)
 857    return _d
 858
 859
 860def remove_ansi(s: str) -> str:
 861    """
 862    Remove ANSI escape characters from a string.
 863
 864    Parameters
 865    ----------
 866    s: str:
 867        The string to be cleaned.
 868
 869    Returns
 870    -------
 871    A string with the ANSI characters removed.
 872
 873    Examples
 874    --------
 875    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
 876    'Hello, World!'
 877
 878    """
 879    import re
 880    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
 881
 882
 883def get_connector_labels(
 884    *types: str,
 885    search_term: str = '',
 886    ignore_exact_match = True,
 887    _additional_options: Optional[List[str]] = None,
 888) -> List[str]:
 889    """
 890    Read connector labels from the configuration dictionary.
 891
 892    Parameters
 893    ----------
 894    *types: str
 895        The connector types.
 896        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
 897
 898    search_term: str, default ''
 899        A filter on the connectors' labels.
 900
 901    ignore_exact_match: bool, default True
 902        If `True`, skip a connector if the search_term is an exact match.
 903
 904    Returns
 905    -------
 906    A list of the keys of defined connectors.
 907
 908    """
 909    from meerschaum.config import get_config
 910    connectors = get_config('meerschaum', 'connectors')
 911
 912    _types = list(types)
 913    if len(_types) == 0:
 914        _types = list(connectors.keys()) + ['plugin']
 915
 916    conns = []
 917    for t in _types:
 918        if t == 'plugin':
 919            from meerschaum.plugins import get_data_plugins
 920            conns += [
 921                f'{t}:' + plugin.module.__name__.split('.')[-1]
 922                for plugin in get_data_plugins()
 923            ]
 924            continue
 925        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
 926
 927    if _additional_options:
 928        conns += _additional_options
 929
 930    possibilities = [
 931        c
 932        for c in conns
 933        if c.startswith(search_term)
 934            and c != (
 935                search_term if ignore_exact_match else ''
 936            )
 937    ]
 938    return sorted(possibilities)
 939
 940
 941def wget(
 942    url: str,
 943    dest: Optional[Union[str, 'pathlib.Path']] = None,
 944    headers: Optional[Dict[str, Any]] = None,
 945    color: bool = True,
 946    debug: bool = False,
 947    **kw: Any
 948) -> 'pathlib.Path':
 949    """
 950    Mimic `wget` with `requests`.
 951
 952    Parameters
 953    ----------
 954    url: str
 955        The URL to the resource to be downloaded.
 956
 957    dest: Optional[Union[str, pathlib.Path]], default None
 958        The destination path of the downloaded file.
 959        If `None`, save to the current directory.
 960
 961    color: bool, default True
 962        If `debug` is `True`, print color output.
 963
 964    debug: bool, default False
 965        Verbosity toggle.
 966
 967    Returns
 968    -------
 969    The path to the downloaded file.
 970
 971    """
 972    from meerschaum.utils.warnings import warn, error
 973    from meerschaum.utils.debug import dprint
 974    import re, urllib.request
 975    if headers is None:
 976        headers = {}
 977    request = urllib.request.Request(url, headers=headers)
 978    if not color:
 979        dprint = print
 980    if debug:
 981        dprint(f"Downloading from '{url}'...")
 982    try:
 983        response = urllib.request.urlopen(request)
 984    except Exception as e:
 985        import ssl
 986        ssl._create_default_https_context = ssl._create_unverified_context
 987        try:
 988            response = urllib.request.urlopen(request)
 989        except Exception as _e:
 990            print(_e)
 991            response = None
 992    if response is None or response.code != 200:
 993        error_msg = f"Failed to download from '{url}'."
 994        if color:
 995            error(error_msg)
 996        else:
 997            print(error_msg)
 998            import sys
 999            sys.exit(1)
1000
1001    d = response.headers.get('content-disposition', None)
1002    fname = (
1003        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1004        else url.split('/')[-1]
1005    )
1006
1007    if dest is None:
1008        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1009    elif isinstance(dest, str):
1010        dest = pathlib.Path(dest)
1011
1012    with open(dest, 'wb') as f:
1013        f.write(response.fp.read())
1014
1015    if debug:
1016        dprint(f"Downloaded file '{dest}'.")
1017
1018    return dest
1019
1020
1021def async_wrap(func):
1022    """
1023    Run a synchronous function as async.
1024    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1025    """
1026    import asyncio
1027    from functools import wraps, partial
1028
1029    @wraps(func)
1030    async def run(*args, loop=None, executor=None, **kwargs):
1031        if loop is None:
1032            loop = asyncio.get_event_loop()
1033        pfunc = partial(func, *args, **kwargs)
1034        return await loop.run_in_executor(executor, pfunc)
1035    return run
1036
1037
1038def debug_trace(browser: bool = True):
1039    """
1040    Open a web-based debugger to trace the execution of the program.
1041
1042    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1043    """
1044    from meerschaum.utils.debug import trace
1045    trace(browser=browser)
1046
1047
1048def items_str(
1049    items: List[Any],
1050    quotes: bool = True,
1051    quote_str: str = "'",
1052    commas: bool = True,
1053    comma_str: str = ',',
1054    and_: bool = True,
1055    and_str: str = 'and',
1056    oxford_comma: bool = True,
1057    spaces: bool = True,
1058    space_str = ' ',
1059) -> str:
1060    """
1061    Return a formatted string if list items separated by commas.
1062
1063    Parameters
1064    ----------
1065    items: [List[Any]]
1066        The items to be printed as an English list.
1067
1068    quotes: bool, default True
1069        If `True`, wrap items in quotes.
1070
1071    quote_str: str, default "'"
1072        If `quotes` is `True`, prepend and append each item with this string.
1073
1074    and_: bool, default True
1075        If `True`, include the word 'and' before the final item in the list.
1076
1077    and_str: str, default 'and'
1078        If `and_` is True, insert this string where 'and' normally would in and English list.
1079
1080    oxford_comma: bool, default True
1081        If `True`, include the Oxford Comma (comma before the final 'and').
1082        Only applies when `and_` is `True`.
1083
1084    spaces: bool, default True
1085        If `True`, separate items with `space_str`
1086
1087    space_str: str, default ' '
1088        If `spaces` is `True`, separate items with this string.
1089
1090    Returns
1091    -------
1092    A string of the items as an English list.
1093
1094    Examples
1095    --------
1096    >>> items_str([1,2,3])
1097    "'1', '2', and '3'"
1098    >>> items_str([1,2,3], quotes=False)
1099    '1, 2, and 3'
1100    >>> items_str([1,2,3], and_=False)
1101    "'1', '2', '3'"
1102    >>> items_str([1,2,3], spaces=False, and_=False)
1103    "'1','2','3'"
1104    >>> items_str([1,2,3], oxford_comma=False)
1105    "'1', '2' and '3'"
1106    >>> items_str([1,2,3], quote_str=":")
1107    ':1:, :2:, and :3:'
1108    >>> items_str([1,2,3], and_str="or")
1109    "'1', '2', or '3'"
1110    >>> items_str([1,2,3], space_str="_")
1111    "'1',_'2',_and_'3'"
1112
1113    """
1114    if not items:
1115        return ''
1116    
1117    q = quote_str if quotes else ''
1118    s = space_str if spaces else ''
1119    a = and_str if and_ else ''
1120    c = comma_str if commas else ''
1121
1122    if len(items) == 1:
1123        return q + str(list(items)[0]) + q
1124
1125    if len(items) == 2:
1126        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1127
1128    sep = q + c + s + q
1129    output = q + sep.join(str(i) for i in items[:-1]) + q
1130    if oxford_comma:
1131        output += c
1132    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1133    return output
1134
1135
1136def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str:
1137    """
1138    Return a human-readable string for a `timedelta` (or `int` minutes).
1139
1140    Parameters
1141    ----------
1142    delta: Union[timedelta, int]
1143        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1144
1145    round_unit: bool, default False
1146        If `True`, round the output to a single unit.
1147
1148    Returns
1149    -------
1150    A formatted string, fit for human eyes.
1151    """
1152    from meerschaum.utils.packages import attempt_import
1153    if is_int(str(delta)) and not round_unit:
1154        return str(delta)
1155
1156    humanfriendly = attempt_import('humanfriendly', lazy=False)
1157    delta_seconds = (
1158        delta.total_seconds()
1159        if hasattr(delta, 'total_seconds')
1160        else (delta * 60)
1161    )
1162
1163    is_negative = delta_seconds < 0
1164    delta_seconds = abs(delta_seconds)
1165    replace_units = {}
1166
1167    if round_unit:
1168        if delta_seconds < 1:
1169            delta_seconds = round(delta_seconds, 2)
1170        elif delta_seconds < 60:
1171            delta_seconds = int(delta_seconds)
1172        elif delta_seconds < 3600:
1173            delta_seconds = int(delta_seconds / 60) * 60
1174        elif delta_seconds < 86400:
1175            delta_seconds = int(delta_seconds / 3600) * 3600
1176        elif delta_seconds < (86400 * 7):
1177            delta_seconds = int(delta_seconds / 86400) * 86400
1178        elif delta_seconds < (86400 * 7 * 4):
1179            delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7)
1180        elif delta_seconds < (86400 * 7 * 4 * 13):
1181            delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7)
1182            replace_units['weeks'] = 'months'
1183        else:
1184            delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364)
1185
1186    delta_str = humanfriendly.format_timespan(delta_seconds)
1187    if ',' in delta_str and round_unit:
1188        delta_str = delta_str.split(',')[0]
1189    elif ' and ' in delta_str and round_unit:
1190        delta_str = delta_str.split(' and ')[0]
1191
1192    for parsed_unit, replacement_unit in replace_units.items():
1193        delta_str = delta_str.replace(parsed_unit, replacement_unit)
1194
1195    return delta_str + (' ago' if is_negative else '')
1196
1197
1198def is_docker_available() -> bool:
1199    """Check if we can connect to the Docker engine."""
1200    import subprocess
1201    try:
1202        has_docker = subprocess.call(
1203            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1204        ) == 0
1205    except Exception:
1206        has_docker = False
1207    return has_docker
1208
1209
1210def is_android() -> bool:
1211    """Return `True` if the current platform is Android."""
1212    import sys
1213    return hasattr(sys, 'getandroidapilevel')
1214
1215
1216def is_bcp_available() -> bool:
1217    """Check if the MSSQL `bcp` utility is installed."""
1218    import subprocess
1219
1220    try:
1221        has_bcp = subprocess.call(
1222            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1223        ) == 0
1224    except Exception:
1225        has_bcp = False
1226    return has_bcp
1227
1228
1229def is_systemd_available() -> bool:
1230    """Check if running on systemd."""
1231    import subprocess
1232    try:
1233        has_systemctl = subprocess.call(
1234            ['systemctl', 'whoami'],
1235            stdout=subprocess.DEVNULL,
1236            stderr=subprocess.STDOUT,
1237        ) == 0
1238    except FileNotFoundError:
1239        has_systemctl = False
1240    except Exception:
1241        import traceback
1242        traceback.print_exc()
1243        has_systemctl = False
1244    return has_systemctl
1245
1246
1247def is_tmux_available() -> bool:
1248    """
1249    Check if `tmux` is installed.
1250    """
1251    import subprocess
1252    try:
1253        has_tmux = subprocess.call(
1254            ['tmux', '-V'],
1255            stdout=subprocess.DEVNULL,
1256            stderr=subprocess.STDOUT
1257        ) == 0
1258    except FileNotFoundError:
1259        has_tmux = False
1260    except Exception:
1261        has_tmux = False
1262    return has_tmux
1263
1264def get_last_n_lines(file_name: str, N: int):
1265    """
1266    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1267    """
1268    # Create an empty list to keep the track of last N lines
1269    list_of_lines = []
1270    # Open file for reading in binary mode
1271    with open(file_name, 'rb') as read_obj:
1272        # Move the cursor to the end of the file
1273        read_obj.seek(0, os.SEEK_END)
1274        # Create a buffer to keep the last read line
1275        buffer = bytearray()
1276        # Get the current position of pointer i.e eof
1277        pointer_location = read_obj.tell()
1278        # Loop till pointer reaches the top of the file
1279        while pointer_location >= 0:
1280            # Move the file pointer to the location pointed by pointer_location
1281            read_obj.seek(pointer_location)
1282            # Shift pointer location by -1
1283            pointer_location = pointer_location -1
1284            # read that byte / character
1285            new_byte = read_obj.read(1)
1286            # If the read byte is new line character then it means one line is read
1287            if new_byte == b'\n':
1288                # Save the line in list of lines
1289                list_of_lines.append(buffer.decode()[::-1])
1290                # If the size of list reaches N, then return the reversed list
1291                if len(list_of_lines) == N:
1292                    return list(reversed(list_of_lines))
1293                # Reinitialize the byte array to save next line
1294                buffer = bytearray()
1295            else:
1296                # If last read character is not eol then add it in buffer
1297                buffer.extend(new_byte)
1298        # As file is read completely, if there is still data in buffer, then its first line.
1299        if len(buffer) > 0:
1300            list_of_lines.append(buffer.decode()[::-1])
1301    # return the reversed list
1302    return list(reversed(list_of_lines))
1303
1304
1305def tail(f, n, offset=None):
1306    """
1307    https://stackoverflow.com/a/692616/9699829
1308    
1309    Reads n lines from f with an offset of offset lines.  The return
1310    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1311    an indicator that is `True` if there are more lines in the file.
1312    """
1313    avg_line_length = 74
1314    to_read = n + (offset or 0)
1315
1316    while True:
1317        try:
1318            f.seek(-(avg_line_length * to_read), 2)
1319        except IOError:
1320            # woops.  apparently file is smaller than what we want
1321            # to step back, go to the beginning instead
1322            f.seek(0)
1323        pos = f.tell()
1324        lines = f.read().splitlines()
1325        if len(lines) >= to_read or pos == 0:
1326            return lines[-to_read:offset and -offset or None], \
1327                   len(lines) > to_read or pos > 0
1328        avg_line_length *= 1.3
1329
1330
1331def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1332    """
1333    Remove characters from each section of a string until the length is within the limit.
1334
1335    Parameters
1336    ----------
1337    item: str
1338        The item name to be truncated.
1339
1340    delimeter: str, default '_'
1341        Split `item` by this string into several sections.
1342
1343    max_len: int, default 128
1344        The max acceptable length of the truncated version of `item`.
1345
1346    Returns
1347    -------
1348    The truncated string.
1349
1350    Examples
1351    --------
1352    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1353    'ab_de_gh'
1354
1355    """
1356    if len(item) < max_len:
1357        return item
1358
1359    def _shorten(s: str) -> str:
1360        return s[:-1] if len(s) > 1 else s
1361
1362    sections = list(enumerate(item.split('_')))
1363    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1364    available_chars = max_len - len(sections)
1365
1366    _sections = [(i, s) for i, s in sorted_sections]
1367    _sections_len = sum([len(s) for i, s in _sections])
1368    _old_sections_len = _sections_len
1369    while _sections_len > available_chars:
1370        _sections = [(i, _shorten(s)) for i, s in _sections]
1371        _old_sections_len = _sections_len
1372        _sections_len = sum([len(s) for i, s in _sections])
1373        if _old_sections_len == _sections_len:
1374            raise Exception(f"String could not be truncated: '{item}'")
1375
1376    new_sections = sorted(_sections, key=lambda x: x[0])
1377    return delimeter.join([s for i, s in new_sections])
1378
1379
1380def truncate_text_for_display(
1381    text: str,
1382    max_length: int = 50,
1383    suffix: str = '…',
1384) -> str:
1385    """
1386    Truncate a potentially long string for display purposes.
1387
1388    Parameters
1389    ----------
1390    text: str
1391        The string to be truncated.
1392
1393    max_length: int, default 60
1394        The maximum length of `text` before truncation.
1395
1396    suffix: str, default '…'
1397        The string to append to the length of `text` to indicate truncation.
1398
1399    Returns
1400    -------
1401    A string of length `max_length` or less.
1402    """
1403    text_length = len(text)
1404    if text_length <= max_length:
1405        return text
1406
1407    suffix_length = len(suffix)
1408
1409    truncated_text = text[:max_length - suffix_length]
1410    return truncated_text + suffix
1411
1412
1413def separate_negation_values(
1414    vals: Union[List[str], Tuple[str]],
1415    negation_prefix: Optional[str] = None,
1416) -> Tuple[List[str], List[str]]:
1417    """
1418    Separate the negated values from the positive ones.
1419    Return two lists: positive and negative values.
1420
1421    Parameters
1422    ----------
1423    vals: Union[List[str], Tuple[str]]
1424        A list of strings to parse.
1425
1426    negation_prefix: Optional[str], default None
1427        Include values that start with this string in the second list.
1428        If `None`, use the system default (`_`).
1429    """
1430    if negation_prefix is None:
1431        from meerschaum._internal.static import STATIC_CONFIG
1432        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1433    _in_vals, _ex_vals = [], []
1434    for v in vals:
1435        if str(v).startswith(negation_prefix):
1436            _ex_vals.append(str(v)[len(negation_prefix):])
1437        else:
1438            _in_vals.append(v)
1439
1440    return _in_vals, _ex_vals
1441
1442
1443def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1444    """
1445    Translate a params dictionary into lists of include- and exclude-values.
1446
1447    Parameters
1448    ----------
1449    params: Optional[Dict[str, Any]]
1450        A params query dictionary.
1451
1452    Returns
1453    -------
1454    A dictionary mapping keys to a tuple of lists for include and exclude values.
1455
1456    Examples
1457    --------
1458    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1459    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1460    """
1461    if not params:
1462        return {}
1463    return {
1464        col: separate_negation_values(
1465            (
1466                val
1467                if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype')
1468                else [val]
1469            )
1470        )
1471        for col, val in params.items()
1472    }
1473
1474
1475def flatten_list(list_: List[Any]) -> List[Any]:
1476    """
1477    Recursively flatten a list.
1478    """
1479    for item in list_:
1480        if isinstance(item, list):
1481            yield from flatten_list(item)
1482        else:
1483            yield item
1484
1485
1486def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1487    """
1488    Parse a string containing the text to be passed into a function
1489    and return a tuple of args, kwargs.
1490
1491    Parameters
1492    ----------
1493    args_str: str
1494        The contents of the function parameter (as a string).
1495
1496    Returns
1497    -------
1498    A tuple of args (tuple) and kwargs (dict[str, Any]).
1499
1500    Examples
1501    --------
1502    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1503    (123, 456), {'foo': 789, 'bar': 'baz'}
1504    """
1505    import ast
1506    args = []
1507    kwargs = {}
1508
1509    for part in args_str.split(','):
1510        if '=' in part:
1511            key, val = part.split('=', 1)
1512            kwargs[key.strip()] = ast.literal_eval(val)
1513        else:
1514            args.append(ast.literal_eval(part.strip()))
1515
1516    return tuple(args), kwargs
1517
1518
1519def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple:
1520    """
1521    Wrap around `pathlib.Path.symlink_to`, but add support for Windows.
1522
1523    Parameters
1524    ----------
1525    src_path: pathlib.Path
1526        The source path.
1527
1528    dest_path: pathlib.Path
1529        The destination path.
1530
1531    Returns
1532    -------
1533    A SuccessTuple indicating success.
1534    """
1535    if dest_path.exists() and dest_path.resolve() == src_path.resolve():
1536        return True, "Symlink already exists."
1537    try:
1538        dest_path.symlink_to(src_path)
1539        success = True
1540    except Exception as e:
1541        success = False
1542        msg = str(e)
1543    if success:
1544        return success, "Success"
1545
1546    ### Failed to create a symlink.
1547    ### If we're not on Windows, return an error.
1548    import platform
1549    if platform.system() != 'Windows':
1550        return success, msg
1551
1552    try:
1553        import _winapi
1554    except ImportError:
1555        return False, "Unable to import _winapi."
1556
1557    if src_path.is_dir():
1558        try:
1559            _winapi.CreateJunction(str(src_path), str(dest_path))
1560        except Exception as e:
1561            return False, str(e)
1562        return True, "Success"
1563
1564    ### Last resort: copy the file on Windows.
1565    import shutil
1566    try:
1567        shutil.copy(src_path, dest_path)
1568    except Exception as e:
1569        return False, str(e)
1570
1571    return True, "Success"
1572
1573
1574def is_symlink(path: pathlib.Path) -> bool:
1575    """
1576    Wrap `path.is_symlink()` but add support for Windows junctions.
1577    """
1578    if path.is_symlink():
1579        return True
1580
1581    import platform
1582    if platform.system() != 'Windows':
1583        return False
1584    try:
1585        return bool(os.readlink(path))
1586    except OSError:
1587        return False
1588
1589
1590def parametrized(dec):
1591    """
1592    A meta-decorator for allowing other decorator functions to have parameters.
1593
1594    https://stackoverflow.com/a/26151604/9699829
1595    """
1596    def layer(*args, **kwargs):
1597        def repl(f):
1598            return dec(f, *args, **kwargs)
1599        return repl
1600    return layer
1601
1602
1603def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1604    """
1605    Safely extract a TAR file to a give directory.
1606    This defends against CVE-2007-4559.
1607
1608    Parameters
1609    ----------
1610    tarf: file
1611        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1612
1613    output_dir: Union[str, pathlib.Path]
1614        The output directory.
1615    """
1616
1617    def is_within_directory(directory, target):
1618        abs_directory = os.path.abspath(directory)
1619        abs_target = os.path.abspath(target)
1620        prefix = os.path.commonprefix([abs_directory, abs_target])
1621        return prefix == abs_directory 
1622
1623    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1624        for member in tar.getmembers():
1625            member_path = os.path.join(path, member.name)
1626            if not is_within_directory(path, member_path):
1627                raise Exception("Attempted Path Traversal in Tar File")
1628
1629        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1630
1631    return safe_extract(tarf, output_dir)
1632
1633
1634def to_snake_case(name: str) -> str:
1635    """
1636    Return the given string in snake-case-style.
1637
1638    Parameters
1639    ----------
1640    name: str
1641        The input text to convert to snake case.
1642
1643    Returns
1644    -------
1645    A snake-case version of `name`.
1646
1647    Examples
1648    --------
1649    >>> to_snake_case("HelloWorld!")
1650    'hello_world'
1651    >>> to_snake_case("This has spaces in it.")
1652    'this_has_spaces_in_it'
1653    >>> to_snake_case("already_in_snake_case")
1654    'already_in_snake_case'
1655    """
1656    import re
1657    name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
1658    name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
1659    name = re.sub(r'[^\w\s]', '', name)
1660    name = re.sub(r'\s+', '_', name)
1661    return name.lower()
1662
1663
1664def get_directory_size(path: Path) -> int:
1665    """
1666    Return the cumulative size of a directory's files in bytes.
1667    https://stackoverflow.com/a/55659577/9699829
1668    """
1669    return sum(file.stat().st_size for file in path.rglob('*'))
1670
1671
1672##################
1673# Legacy imports #
1674##################
1675
1676def choose_subaction(*args, **kwargs) -> Any:
1677    """
1678    Placeholder function to prevent breaking legacy behavior.
1679    See `meerschaum.actions.choose_subaction`.
1680    """
1681    from meerschaum.actions import choose_subaction as _choose_subactions
1682    return _choose_subactions(*args, **kwargs)
1683
1684
1685def print_options(*args, **kwargs) -> None:
1686    """
1687    Placeholder function to prevent breaking legacy behavior.
1688    See `meerschaum.utils.formatting.print_options`.
1689    """
1690    from meerschaum.utils.formatting import print_options as _print_options
1691    return _print_options(*args, **kwargs)
1692
1693
1694def to_pandas_dtype(*args, **kwargs) -> Any:
1695    """
1696    Placeholder function to prevent breaking legacy behavior.
1697    See `meerschaum.utils.dtypes.to_pandas_dtype`.
1698    """
1699    from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype
1700    return _to_pandas_dtype(*args, **kwargs)
1701
1702
1703def filter_unseen_df(*args, **kwargs) -> Any:
1704    """
1705    Placeholder function to prevent breaking legacy behavior.
1706    See `meerschaum.utils.dataframe.filter_unseen_df`.
1707    """
1708    from meerschaum.utils.dataframe import filter_unseen_df as real_function
1709    return real_function(*args, **kwargs)
1710
1711
1712def add_missing_cols_to_df(*args, **kwargs) -> Any:
1713    """
1714    Placeholder function to prevent breaking legacy behavior.
1715    See `meerschaum.utils.dataframe.add_missing_cols_to_df`.
1716    """
1717    from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function
1718    return real_function(*args, **kwargs)
1719
1720
1721def parse_df_datetimes(*args, **kwargs) -> Any:
1722    """
1723    Placeholder function to prevent breaking legacy behavior.
1724    See `meerschaum.utils.dataframe.parse_df_datetimes`.
1725    """
1726    from meerschaum.utils.dataframe import parse_df_datetimes as real_function
1727    return real_function(*args, **kwargs)
1728
1729
1730def df_from_literal(*args, **kwargs) -> Any:
1731    """
1732    Placeholder function to prevent breaking legacy behavior.
1733    See `meerschaum.utils.dataframe.df_from_literal`.
1734    """
1735    from meerschaum.utils.dataframe import df_from_literal as real_function
1736    return real_function(*args, **kwargs)
1737
1738
1739def get_json_cols(*args, **kwargs) -> Any:
1740    """
1741    Placeholder function to prevent breaking legacy behavior.
1742    See `meerschaum.utils.dataframe.get_json_cols`.
1743    """
1744    from meerschaum.utils.dataframe import get_json_cols as real_function
1745    return real_function(*args, **kwargs)
1746
1747
1748def get_unhashable_cols(*args, **kwargs) -> Any:
1749    """
1750    Placeholder function to prevent breaking legacy behavior.
1751    See `meerschaum.utils.dataframe.get_unhashable_cols`.
1752    """
1753    from meerschaum.utils.dataframe import get_unhashable_cols as real_function
1754    return real_function(*args, **kwargs)
1755
1756
1757def enforce_dtypes(*args, **kwargs) -> Any:
1758    """
1759    Placeholder function to prevent breaking legacy behavior.
1760    See `meerschaum.utils.dataframe.enforce_dtypes`.
1761    """
1762    from meerschaum.utils.dataframe import enforce_dtypes as real_function
1763    return real_function(*args, **kwargs)
1764
1765
1766def get_datetime_bound_from_df(*args, **kwargs) -> Any:
1767    """
1768    Placeholder function to prevent breaking legacy behavior.
1769    See `meerschaum.utils.dataframe.get_datetime_bound_from_df`.
1770    """
1771    from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function
1772    return real_function(*args, **kwargs)
1773
1774
1775def df_is_chunk_generator(*args, **kwargs) -> Any:
1776    """
1777    Placeholder function to prevent breaking legacy behavior.
1778    See `meerschaum.utils.dataframe.df_is_chunk_generator`.
1779    """
1780    from meerschaum.utils.dataframe import df_is_chunk_generator as real_function
1781    return real_function(*args, **kwargs)
1782
1783
1784def choices_docstring(*args, **kwargs) -> Any:
1785    """
1786    Placeholder function to prevent breaking legacy behavior.
1787    See `meerschaum.actions.choices_docstring`.
1788    """
1789    from meerschaum.actions import choices_docstring as real_function
1790    return real_function(*args, **kwargs)
1791
1792
1793def _get_subaction_names(*args, **kwargs) -> Any:
1794    """
1795    Placeholder function to prevent breaking legacy behavior.
1796    See `meerschaum.actions._get_subaction_names`.
1797    """
1798    from meerschaum.actions import _get_subaction_names as real_function
1799    return real_function(*args, **kwargs)
1800
1801
1802def json_serialize_datetime(dt: datetime) -> Union[str, None]:
1803    """
1804    Serialize a datetime object into JSON (ISO format string).
1805
1806    Examples
1807    --------
1808    >>> import json
1809    >>> from datetime import datetime
1810    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
1811    '{"a": "2022-01-01T00:00:00Z"}'
1812
1813    """
1814    from meerschaum.utils.dtypes import serialize_datetime
1815    return serialize_datetime(dt)
1816
1817
1818def replace_pipes_in_dict(*args, **kwargs):
1819    """
1820    Placeholder function to prevent breaking legacy behavior.
1821    See `meerschaum.utils.pipes.replace_pipes_in_dict`.
1822    """
1823    from meerschaum.utils.pipes import replace_pipes_in_dict
1824    return replace_pipes_in_dict(*args, **kwargs)
1825
1826
1827def is_pipe_registered(*args, **kwargs):
1828    """
1829    Placeholder function to prevent breaking legacy behavior.
1830    See `meerschaum.utils.pipes.is_pipe_registered`.
1831    """
1832    from meerschaum.utils.pipes import is_pipe_registered
1833    return is_pipe_registered(*args, **kwargs)
1834
1835
1836def round_time(*args, **kwargs):
1837    """
1838    Placeholder function to prevent breaking legacy behavior.
1839    See `meerschaum.utils.dtypes.round_time`.
1840    """
1841    from meerschaum.utils.dtypes import round_time
1842    return round_time(*args, **kwargs)
1843
1844
1845_current_module = sys.modules[__name__]
1846__all__ = tuple(
1847    name
1848    for name, obj in globals().items()
1849    if callable(obj)
1850        and name not in __pdoc__
1851        and getattr(obj, '__module__', None) == _current_module.__name__
1852        and not name.startswith('_')
1853)
def add_method_to_class( func: Callable[[Any], Any], class_def: "'Class'", method_name: Optional[str] = None, keep_self: Optional[bool] = None) -> Callable[[Any], Any]:
52def add_method_to_class(
53    func: Callable[[Any], Any],
54    class_def: 'Class',
55    method_name: Optional[str] = None,
56    keep_self: Optional[bool] = None,
57) -> Callable[[Any], Any]:
58    """
59    Add function `func` to class `class_def`.
60
61    Parameters
62    ----------
63    func: Callable[[Any], Any]
64        Function to be added as a method of the class
65
66    class_def: Class
67        Class to be modified.
68
69    method_name: Optional[str], default None
70        New name of the method. None will use func.__name__ (default).
71
72    Returns
73    -------
74    The modified function object.
75
76    """
77    from functools import wraps
78
79    is_class = isinstance(class_def, type)
80    
81    @wraps(func)
82    def wrapper(self, *args, **kw):
83        return func(*args, **kw)
84
85    if method_name is None:
86        method_name = func.__name__
87
88    setattr(class_def, method_name, (
89            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
90        )
91    )
92
93    return func

Add function func to class class_def.

Parameters
  • func (Callable[[Any], Any]): Function to be added as a method of the class
  • class_def (Class): Class to be modified.
  • method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
  • The modified function object.
def generate_password(length: int = 12) -> str:
 96def generate_password(length: int = 12) -> str:
 97    """
 98    Generate a secure password of given length.
 99
100    Parameters
101    ----------
102    length: int, default 12
103        The length of the password.
104
105    Returns
106    -------
107    A random password string.
108    """
109    import secrets
110    import string
111    return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))

Generate a secure password of given length.

Parameters
  • length (int, default 12): The length of the password.
Returns
  • A random password string.
def is_int(s: str) -> bool:
114def is_int(s: str) -> bool:
115    """
116    Check if string is an int.
117
118    Parameters
119    ----------
120    s: str
121        The string to be checked.
122        
123    Returns
124    -------
125    A bool indicating whether the string was able to be cast to an integer.
126
127    """
128    try:
129        return float(s).is_integer()
130    except Exception:
131        return False

Check if string is an int.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string was able to be cast to an integer.
def is_uuid(s: str) -> bool:
134def is_uuid(s: str) -> bool:
135    """
136    Check if a string is a valid UUID.
137
138    Parameters
139    ----------
140    s: str
141        The string to be checked.
142
143    Returns
144    -------
145    A bool indicating whether the string is a valid UUID.
146    """
147    import uuid
148    try:
149        uuid.UUID(str(s))
150        return True
151    except Exception:
152        return False

Check if a string is a valid UUID.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string is a valid UUID.
def string_to_dict(params_string: str) -> Dict[str, Any]:
155def string_to_dict(params_string: str) -> Dict[str, Any]:
156    """
157    Parse a string into a dictionary.
158    
159    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
160
161    Parameters
162    ----------
163    params_string: str
164        The string to be parsed.
165        
166    Returns
167    -------
168    The parsed dictionary.
169
170    Examples
171    --------
172    >>> string_to_dict("a:1,b:2") 
173    {'a': 1, 'b': 2}
174    >>> string_to_dict('{"a": 1, "b": 2}')
175    {'a': 1, 'b': 2}
176
177    """
178    if not params_string:
179        return {}
180
181    import json
182
183    ### Kind of a weird edge case.
184    ### In the generated compose file, there is some weird escaping happening,
185    ### so the string to be parsed starts and ends with a single quote.
186    if (
187        isinstance(params_string, str)
188        and len(params_string) > 4
189        and params_string[1] == "{"
190        and params_string[-2] == "}"
191    ):
192        return json.loads(params_string[1:-1])
193
194    if str(params_string).startswith('{'):
195        return json.loads(params_string)
196
197    import ast
198    params_dict = {}
199    
200    items = []
201    bracket_level = 0
202    brace_level = 0
203    current_item = ''
204    in_quotes = False
205    quote_char = ''
206    
207    i = 0
208    while i < len(params_string):
209        char = params_string[i]
210        
211        if in_quotes:
212            if char == quote_char and (i == 0 or params_string[i-1] != '\\'):
213                in_quotes = False
214        else:
215            if char in ('"', "'"):
216                in_quotes = True
217                quote_char = char
218            elif char == '[':
219                bracket_level += 1
220            elif char == ']':
221                bracket_level -= 1
222            elif char == '{':
223                brace_level += 1
224            elif char == '}':
225                brace_level -= 1
226            elif char == ',' and bracket_level == 0 and brace_level == 0:
227                items.append(current_item)
228                current_item = ''
229                i += 1
230                continue
231        
232        current_item += char
233        i += 1
234        
235    if current_item:
236        items.append(current_item)
237
238    for param in items:
239        param = param.strip()
240        if not param:
241            continue
242
243        _keys = param.split(":", maxsplit=1)
244        if len(_keys) != 2:
245            continue
246
247        keys = _keys[:-1]
248        try:
249            val = ast.literal_eval(_keys[-1])
250        except Exception:
251            val = str(_keys[-1])
252
253        c = params_dict
254        for _k in keys[:-1]:
255            try:
256                k = ast.literal_eval(_k)
257            except Exception:
258                k = str(_k)
259            if k not in c:
260                c[k] = {}
261            c = c[k]
262
263        c[keys[-1]] = val
264
265    return params_dict

Parse a string into a dictionary.

If the string begins with '{', parse as JSON. Otherwise use simple parsing.

Parameters
  • params_string (str): The string to be parsed.
Returns
  • The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2") 
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
def to_simple_dict(doc: Dict[str, Any]) -> str:
268def to_simple_dict(doc: Dict[str, Any]) -> str:
269    """
270    Serialize a document dictionary in simple-dict format.
271    """
272    import json
273    import ast
274    from meerschaum.utils.dtypes import json_serialize_value
275
276    def serialize_value(value):
277        if isinstance(value, str):
278            try:
279                evaluated = ast.literal_eval(value)
280                if not isinstance(evaluated, str):
281                    return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
282                return value
283            except (ValueError, SyntaxError, TypeError, MemoryError):
284                return value
285        
286        return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
287
288    return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items())

Serialize a document dictionary in simple-dict format.

def parse_config_substitution( value: str, leading_key: str = 'MRSM', begin_key: str = '{', end_key: str = '}', delimeter: str = ':') -> List[Any]:
291def parse_config_substitution(
292    value: str,
293    leading_key: str = 'MRSM',
294    begin_key: str = '{',
295    end_key: str = '}',
296    delimeter: str = ':',
297) -> List[Any]:
298    """
299    Parse Meerschaum substitution syntax
300    E.g. MRSM{value1:value2} => ['value1', 'value2']
301    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
302    """
303    if not value.beginswith(leading_key):
304        return value
305
306    return leading_key[len(leading_key):][len():-1].split(delimeter)

Parse Meerschaum substitution syntax E.g. MRSM{value1:value2} => ['value1', 'value2'] NOTE: Not currently used. See search_and_substitute_config in meerschaum.config._read_yaml.

def edit_file( path: Union[pathlib.Path, str], default_editor: str = 'pyvim', debug: bool = False) -> bool:
309def edit_file(
310    path: Union[pathlib.Path, str],
311    default_editor: str = 'pyvim',
312    debug: bool = False
313) -> bool:
314    """
315    Open a file for editing.
316
317    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
318
319    Parameters
320    ----------
321    path: Union[pathlib.Path, str]
322        The path to the file to be edited.
323
324    default_editor: str, default 'pyvim'
325        If `$EDITOR` is not set, use this instead.
326        If `pyvim` is not installed, it will install it from PyPI.
327
328    debug: bool, default False
329        Verbosity toggle.
330
331    Returns
332    -------
333    A bool indicating the file was successfully edited.
334    """
335    from subprocess import call
336    from meerschaum.utils.debug import dprint
337    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
338    try:
339        EDITOR = os.environ.get('EDITOR', default_editor)
340        if debug:
341            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
342        rc = call([EDITOR, path])
343    except Exception as e: ### can't open with default editors
344        if debug:
345            dprint(str(e))
346            dprint('Failed to open file with system editor. Falling back to pyvim...')
347        pyvim = attempt_import('pyvim', lazy=False)
348        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
349    return rc == 0

Open a file for editing.

Attempt to launch the user's defined $EDITOR, otherwise use pyvim.

Parameters
  • path (Union[pathlib.Path, str]): The path to the file to be edited.
  • default_editor (str, default 'pyvim'): If $EDITOR is not set, use this instead. If pyvim is not installed, it will install it from PyPI.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating the file was successfully edited.
def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
352def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
353    """
354    Determine the columns and lines in the terminal.
355    If they cannot be determined, return the default values (100 columns and 120 lines).
356
357    Parameters
358    ----------
359    default_cols: int, default 100
360        If the columns cannot be determined, return this value.
361
362    default_lines: int, default 120
363        If the lines cannot be determined, return this value.
364
365    Returns
366    -------
367    A tuple if integers for the columns and lines.
368    """
369    try:
370        size = os.get_terminal_size()
371        _cols, _lines = size.columns, size.lines
372    except Exception:
373        _cols, _lines = (
374            int(os.environ.get('COLUMNS', str(default_cols))),
375            int(os.environ.get('LINES', str(default_lines))),
376        )
377    return _cols, _lines

Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).

Parameters
  • default_cols (int, default 100): If the columns cannot be determined, return this value.
  • default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
  • A tuple if integers for the columns and lines.
def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
380def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
381    """
382    Iterate over a list in chunks.
383    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
384
385    Parameters
386    ----------
387    iterable: Iterable[Any]
388        The iterable to iterate over in chunks.
389        
390    chunksize: int
391        The size of chunks to iterate with.
392        
393    fillvalue: Optional[Any], default None
394        If the chunks do not evenly divide into the iterable, pad the end with this value.
395
396    Returns
397    -------
398    A generator of tuples of size `chunksize`.
399
400    """
401    from itertools import zip_longest
402    args = [iter(iterable)] * chunksize
403    return zip_longest(*args, fillvalue=fillvalue)

Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks

Parameters
  • iterable (Iterable[Any]): The iterable to iterate over in chunks.
  • chunksize (int): The size of chunks to iterate with.
  • fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
  • A generator of tuples of size chunksize.
def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
405def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
406    """
407    Sort a dictionary's values and return a new dictionary.
408
409    Parameters
410    ----------
411    d: Dict[Any, Any]
412        The dictionary to be sorted.
413
414    Returns
415    -------
416    A sorted dictionary.
417
418    Examples
419    --------
420    >>> sorted_dict({'b': 1, 'a': 2})
421    {'b': 1, 'a': 2}
422    >>> sorted_dict({'b': 2, 'a': 1})
423    {'a': 1, 'b': 2}
424
425    """
426    try:
427        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
428    except Exception:
429        return d

Sort a dictionary's values and return a new dictionary.

Parameters
  • d (Dict[Any, Any]): The dictionary to be sorted.
Returns
  • A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
def flatten_pipes_dict( pipes_dict: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]) -> 'List[Pipe]':
431def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
432    """
433    Convert the standard pipes dictionary into a list.
434
435    Parameters
436    ----------
437    pipes_dict: PipesDict
438        The pipes dictionary to be flattened.
439
440    Returns
441    -------
442    A list of `Pipe` objects.
443
444    """
445    pipes_list = []
446    for ck in pipes_dict.values():
447        for mk in ck.values():
448            pipes_list += list(mk.values())
449    return pipes_list

Convert the standard pipes dictionary into a list.

Parameters
  • pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
  • A list of Pipe objects.
def timed_input( seconds: int = 10, timeout_message: str = '', prompt: str = '', icon: bool = False, **kw) -> Optional[str]:
452def timed_input(
453    seconds: int = 10,
454    timeout_message: str = "",
455    prompt: str = "",
456    icon: bool = False,
457    **kw
458) -> Union[str, None]:
459    """
460    Accept user input only for a brief period of time.
461
462    Parameters
463    ----------
464    seconds: int, default 10
465        The number of seconds to wait.
466
467    timeout_message: str, default ''
468        The message to print after the window has elapsed.
469
470    prompt: str, default ''
471        The prompt to print during the window.
472
473    icon: bool, default False
474        If `True`, print the configured input icon.
475
476
477    Returns
478    -------
479    The input string entered by the user.
480
481    """
482    import signal, time
483
484    class TimeoutExpired(Exception):
485        """Raise this exception when the timeout is reached."""
486
487    def alarm_handler(signum, frame):
488        raise TimeoutExpired
489
490    # set signal handler
491    signal.signal(signal.SIGALRM, alarm_handler)
492    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
493
494    try:
495        return input(prompt)
496    except TimeoutExpired:
497        return None
498    except (EOFError, RuntimeError):
499        try:
500            print(prompt)
501            time.sleep(seconds)
502        except TimeoutExpired:
503            return None
504    finally:
505        signal.alarm(0) # cancel alarm

Accept user input only for a brief period of time.

Parameters
  • seconds (int, default 10): The number of seconds to wait.
  • timeout_message (str, default ''): The message to print after the window has elapsed.
  • prompt (str, default ''): The prompt to print during the window.
  • icon (bool, default False): If True, print the configured input icon.
Returns
  • The input string entered by the user.
def enforce_gevent_monkey_patch():
508def enforce_gevent_monkey_patch():
509    """
510    Check if gevent monkey patching is enabled, and if not, then apply patching.
511    """
512    from meerschaum.utils.packages import attempt_import
513    import socket
514    gevent, gevent_socket, gevent_monkey = attempt_import(
515        'gevent', 'gevent.socket', 'gevent.monkey'
516    )
517    if not socket.socket is gevent_socket.socket:
518        gevent_monkey.patch_all()

Check if gevent monkey patching is enabled, and if not, then apply patching.

def is_valid_email(email: str) -> Optional[re.Match]:
520def is_valid_email(email: str) -> Union['re.Match', None]:
521    """
522    Check whether a string is a valid email.
523
524    Parameters
525    ----------
526    email: str
527        The string to be examined.
528        
529    Returns
530    -------
531    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
532
533    Examples
534    --------
535    >>> is_valid_email('foo')
536    >>> is_valid_email('foo@foo.com')
537    <re.Match object; span=(0, 11), match='foo@foo.com'>
538
539    """
540    import re
541    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
542    return re.search(regex, email)

Check whether a string is a valid email.

Parameters
  • email (str): The string to be examined.
Returns
  • None if a string is not in email format, otherwise a re.Match object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
def string_width(string: str, widest: bool = True) -> int:
545def string_width(string: str, widest: bool = True) -> int:
546    """
547    Calculate the width of a string, either by its widest or last line.
548
549    Parameters
550    ----------
551    string: str:
552        The string to be examined.
553        
554    widest: bool, default True
555        No longer used because `widest` is always assumed to be true.
556
557    Returns
558    -------
559    An integer for the text's visual width.
560
561    Examples
562    --------
563    >>> string_width('a')
564    1
565    >>> string_width('a\\nbc\\nd')
566    2
567
568    """
569    def _widest():
570        words = string.split('\n')
571        max_length = 0
572        for w in words:
573            length = len(w)
574            if length > max_length:
575                max_length = length
576        return max_length
577
578    return _widest()

Calculate the width of a string, either by its widest or last line.

Parameters
  • string (str:): The string to be examined.
  • widest (bool, default True): No longer used because widest is always assumed to be true.
Returns
  • An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any:
623def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any:
624    """
625    Get a value from a dictionary with a tuple of keys.
626
627    Parameters
628    ----------
629    d: Dict[Any, Any]
630        The dictionary to search.
631
632    path: Tuple[Any, ...]
633        The path of keys to traverse.
634
635    Returns
636    -------
637    The value from the end of the path.
638    """
639    return functools.reduce(lambda di, key: di[key], path, d)

Get a value from a dictionary with a tuple of keys.

Parameters
  • d (Dict[Any, Any]): The dictionary to search.
  • path (Tuple[Any, ...]): The path of keys to traverse.
Returns
  • The value from the end of the path.
def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None:
642def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None:
643    """
644    Set a value in a dictionary with a tuple of keys.
645
646    Parameters
647    ----------
648    d: Dict[Any, Any]
649        The dictionary to search.
650
651    path: Tuple[Any, ...]
652        The path of keys to traverse.
653
654    val: Any
655        The value to set at the end of the path.
656    """
657    get_val_from_dict_path(d, path[:-1])[path[-1]] = val

Set a value in a dictionary with a tuple of keys.

Parameters
  • d (Dict[Any, Any]): The dictionary to search.
  • path (Tuple[Any, ...]): The path of keys to traverse.
  • val (Any): The value to set at the end of the path.
def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
660def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
661    """
662    Recursively replace passwords in a dictionary.
663
664    Parameters
665    ----------
666    d: Dict[str, Any]
667        The dictionary to search through.
668
669    replace_with: str, default '*'
670        The string to replace each character of the password with.
671
672    Returns
673    -------
674    Another dictionary where values to the keys `'password'`
675    are replaced with `replace_with` (`'*'`).
676
677    Examples
678    --------
679    >>> replace_password({'a': 1})
680    {'a': 1}
681    >>> replace_password({'password': '123'})
682    {'password': '***'}
683    >>> replace_password({'nested': {'password': '123'}})
684    {'nested': {'password': '***'}}
685    >>> replace_password({'password': '123'}, replace_with='!')
686    {'password': '!!!'}
687
688    """
689    import copy
690    _d = copy.deepcopy(d)
691    for k, v in d.items():
692        if isinstance(v, dict):
693            _d[k] = replace_password(v)
694        elif 'password' in str(k).lower():
695            _d[k] = ''.join([replace_with for char in str(v)])
696        elif str(k).lower() == 'uri':
697            from meerschaum.connectors.sql import SQLConnector
698            try:
699                uri_params = SQLConnector.parse_uri(v)
700            except Exception:
701                uri_params = None
702            if not uri_params:
703                continue
704            if not 'username' in uri_params or not 'password' in uri_params:
705                continue
706            _d[k] = v.replace(
707                uri_params['username'] + ':' + uri_params['password'],
708                uri_params['username'] + ':' + ''.join(
709                    [replace_with for char in str(uri_params['password'])]
710                )
711            )
712    return _d

Recursively replace passwords in a dictionary.

Parameters
  • d (Dict[str, Any]): The dictionary to search through.
  • replace_with (str, default '*'): The string to replace each character of the password with.
Returns
  • Another dictionary where values to the keys 'password'
  • are replaced with replace_with ('*').
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
def filter_arguments( func: Callable[[Any], Any], *args: Any, **kwargs: Any) -> Tuple[Tuple[Any], Dict[str, Any]]:
715def filter_arguments(
716    func: Callable[[Any], Any],
717    *args: Any,
718    **kwargs: Any
719) -> Tuple[Tuple[Any], Dict[str, Any]]:
720    """
721    Filter out unsupported positional and keyword arguments.
722
723    Parameters
724    ----------
725    func: Callable[[Any], Any]
726        The function to inspect.
727
728    *args: Any
729        Positional arguments to filter and pass to `func`.
730
731    **kwargs
732        Keyword arguments to filter and pass to `func`.
733
734    Returns
735    -------
736    The `args` and `kwargs` accepted by `func`.
737    """
738    args = filter_positionals(func, *args)
739    kwargs = filter_keywords(func, **kwargs)
740    return args, kwargs

Filter out unsupported positional and keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): Positional arguments to filter and pass to func.
  • **kwargs: Keyword arguments to filter and pass to func.
Returns
  • The args and kwargs accepted by func.
def filter_keywords(func: Callable[[Any], Any], **kw: Any) -> Dict[str, Any]:
743def filter_keywords(
744    func: Callable[[Any], Any],
745    **kw: Any
746) -> Dict[str, Any]:
747    """
748    Filter out unsupported keyword arguments.
749
750    Parameters
751    ----------
752    func: Callable[[Any], Any]
753        The function to inspect.
754
755    **kw: Any
756        The arguments to be filtered and passed into `func`.
757
758    Returns
759    -------
760    A dictionary of keyword arguments accepted by `func`.
761
762    Examples
763    --------
764    ```python
765    >>> def foo(a=1, b=2):
766    ...     return a * b
767    >>> filter_keywords(foo, a=2, b=4, c=6)
768    {'a': 2, 'b': 4}
769    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
770    8
771    ```
772
773    """
774    import inspect
775    func_params = inspect.signature(func).parameters
776    ### If the function has a **kw method, skip filtering.
777    for param, _type in func_params.items():
778        if '**' in str(_type):
779            return kw
780    return {k: v for k, v in kw.items() if k in func_params}

Filter out unsupported keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • **kw (Any): The arguments to be filtered and passed into func.
Returns
  • A dictionary of keyword arguments accepted by func.
Examples
>>> def foo(a=1, b=2):
...     return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
def filter_positionals(func: Callable[[Any], Any], *args: Any) -> Tuple[Any]:
783def filter_positionals(
784    func: Callable[[Any], Any],
785    *args: Any
786) -> Tuple[Any]:
787    """
788    Filter out unsupported positional arguments.
789
790    Parameters
791    ----------
792    func: Callable[[Any], Any]
793        The function to inspect.
794
795    *args: Any
796        The arguments to be filtered and passed into `func`.
797        NOTE: If the function signature expects more arguments than provided,
798        the missing slots will be filled with `None`.
799
800    Returns
801    -------
802    A tuple of positional arguments accepted by `func`.
803
804    Examples
805    --------
806    ```python
807    >>> def foo(a, b):
808    ...     return a * b
809    >>> filter_positionals(foo, 2, 4, 6)
810    (2, 4)
811    >>> foo(*filter_positionals(foo, 2, 4, 6))
812    8
813    ```
814
815    """
816    import inspect
817    from meerschaum.utils.warnings import warn
818    func_params = inspect.signature(func).parameters
819    acceptable_args: List[Any] = []
820
821    def _warn_invalids(_num_invalid):
822        if _num_invalid > 0:
823            warn(
824                "Too few arguments were provided. "
825                + f"{_num_invalid} argument"
826                + ('s have ' if _num_invalid != 1 else " has ")
827                + " been filled with `None`.",
828            )
829
830    num_invalid: int = 0
831    for i, (param, val) in enumerate(func_params.items()):
832        if '=' in str(val) or '*' in str(val):
833            _warn_invalids(num_invalid)
834            return tuple(acceptable_args)
835
836        try:
837            acceptable_args.append(args[i])
838        except IndexError:
839            acceptable_args.append(None)
840            num_invalid += 1
841
842    _warn_invalids(num_invalid)
843    return tuple(acceptable_args)

Filter out unsupported positional arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): The arguments to be filtered and passed into func. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled with None.
Returns
  • A tuple of positional arguments accepted by func.
Examples
>>> def foo(a, b):
...     return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
846def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
847    """
848    Convert an ordered dict to a dict.
849    Does not mutate the original OrderedDict.
850    """
851    from collections import OrderedDict
852    _d = dict(od)
853    for k, v in od.items():
854        if isinstance(v, OrderedDict) or (
855            issubclass(type(v), OrderedDict)
856        ):
857            _d[k] = dict_from_od(v)
858    return _d

Convert an ordered dict to a dict. Does not mutate the original OrderedDict.

def remove_ansi(s: str) -> str:
861def remove_ansi(s: str) -> str:
862    """
863    Remove ANSI escape characters from a string.
864
865    Parameters
866    ----------
867    s: str:
868        The string to be cleaned.
869
870    Returns
871    -------
872    A string with the ANSI characters removed.
873
874    Examples
875    --------
876    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
877    'Hello, World!'
878
879    """
880    import re
881    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)

Remove ANSI escape characters from a string.

Parameters
  • s (str:): The string to be cleaned.
Returns
  • A string with the ANSI characters removed.
Examples
>>> remove_ansi("Hello, World!")
'Hello, World!'
def get_connector_labels( *types: str, search_term: str = '', ignore_exact_match=True, _additional_options: Optional[List[str]] = None) -> List[str]:
884def get_connector_labels(
885    *types: str,
886    search_term: str = '',
887    ignore_exact_match = True,
888    _additional_options: Optional[List[str]] = None,
889) -> List[str]:
890    """
891    Read connector labels from the configuration dictionary.
892
893    Parameters
894    ----------
895    *types: str
896        The connector types.
897        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
898
899    search_term: str, default ''
900        A filter on the connectors' labels.
901
902    ignore_exact_match: bool, default True
903        If `True`, skip a connector if the search_term is an exact match.
904
905    Returns
906    -------
907    A list of the keys of defined connectors.
908
909    """
910    from meerschaum.config import get_config
911    connectors = get_config('meerschaum', 'connectors')
912
913    _types = list(types)
914    if len(_types) == 0:
915        _types = list(connectors.keys()) + ['plugin']
916
917    conns = []
918    for t in _types:
919        if t == 'plugin':
920            from meerschaum.plugins import get_data_plugins
921            conns += [
922                f'{t}:' + plugin.module.__name__.split('.')[-1]
923                for plugin in get_data_plugins()
924            ]
925            continue
926        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
927
928    if _additional_options:
929        conns += _additional_options
930
931    possibilities = [
932        c
933        for c in conns
934        if c.startswith(search_term)
935            and c != (
936                search_term if ignore_exact_match else ''
937            )
938    ]
939    return sorted(possibilities)

Read connector labels from the configuration dictionary.

Parameters
  • *types (str): The connector types. If none are provided, use the defined types ('sql' and 'api') and 'plugin'.
  • search_term (str, default ''): A filter on the connectors' labels.
  • ignore_exact_match (bool, default True): If True, skip a connector if the search_term is an exact match.
Returns
  • A list of the keys of defined connectors.
def wget( url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, color: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
 942def wget(
 943    url: str,
 944    dest: Optional[Union[str, 'pathlib.Path']] = None,
 945    headers: Optional[Dict[str, Any]] = None,
 946    color: bool = True,
 947    debug: bool = False,
 948    **kw: Any
 949) -> 'pathlib.Path':
 950    """
 951    Mimic `wget` with `requests`.
 952
 953    Parameters
 954    ----------
 955    url: str
 956        The URL to the resource to be downloaded.
 957
 958    dest: Optional[Union[str, pathlib.Path]], default None
 959        The destination path of the downloaded file.
 960        If `None`, save to the current directory.
 961
 962    color: bool, default True
 963        If `debug` is `True`, print color output.
 964
 965    debug: bool, default False
 966        Verbosity toggle.
 967
 968    Returns
 969    -------
 970    The path to the downloaded file.
 971
 972    """
 973    from meerschaum.utils.warnings import warn, error
 974    from meerschaum.utils.debug import dprint
 975    import re, urllib.request
 976    if headers is None:
 977        headers = {}
 978    request = urllib.request.Request(url, headers=headers)
 979    if not color:
 980        dprint = print
 981    if debug:
 982        dprint(f"Downloading from '{url}'...")
 983    try:
 984        response = urllib.request.urlopen(request)
 985    except Exception as e:
 986        import ssl
 987        ssl._create_default_https_context = ssl._create_unverified_context
 988        try:
 989            response = urllib.request.urlopen(request)
 990        except Exception as _e:
 991            print(_e)
 992            response = None
 993    if response is None or response.code != 200:
 994        error_msg = f"Failed to download from '{url}'."
 995        if color:
 996            error(error_msg)
 997        else:
 998            print(error_msg)
 999            import sys
1000            sys.exit(1)
1001
1002    d = response.headers.get('content-disposition', None)
1003    fname = (
1004        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1005        else url.split('/')[-1]
1006    )
1007
1008    if dest is None:
1009        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1010    elif isinstance(dest, str):
1011        dest = pathlib.Path(dest)
1012
1013    with open(dest, 'wb') as f:
1014        f.write(response.fp.read())
1015
1016    if debug:
1017        dprint(f"Downloaded file '{dest}'.")
1018
1019    return dest

Mimic wget with requests.

Parameters
  • url (str): The URL to the resource to be downloaded.
  • dest (Optional[Union[str, pathlib.Path]], default None): The destination path of the downloaded file. If None, save to the current directory.
  • color (bool, default True): If debug is True, print color output.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The path to the downloaded file.
def async_wrap(func):
1022def async_wrap(func):
1023    """
1024    Run a synchronous function as async.
1025    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1026    """
1027    import asyncio
1028    from functools import wraps, partial
1029
1030    @wraps(func)
1031    async def run(*args, loop=None, executor=None, **kwargs):
1032        if loop is None:
1033            loop = asyncio.get_event_loop()
1034        pfunc = partial(func, *args, **kwargs)
1035        return await loop.run_in_executor(executor, pfunc)
1036    return run
def debug_trace(browser: bool = True):
1039def debug_trace(browser: bool = True):
1040    """
1041    Open a web-based debugger to trace the execution of the program.
1042
1043    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1044    """
1045    from meerschaum.utils.debug import trace
1046    trace(browser=browser)

Open a web-based debugger to trace the execution of the program.

This is an alias import for meerschaum.utils.debug.debug_trace.

def items_str( items: List[Any], quotes: bool = True, quote_str: str = "'", commas: bool = True, comma_str: str = ',', and_: bool = True, and_str: str = 'and', oxford_comma: bool = True, spaces: bool = True, space_str=' ') -> str:
1049def items_str(
1050    items: List[Any],
1051    quotes: bool = True,
1052    quote_str: str = "'",
1053    commas: bool = True,
1054    comma_str: str = ',',
1055    and_: bool = True,
1056    and_str: str = 'and',
1057    oxford_comma: bool = True,
1058    spaces: bool = True,
1059    space_str = ' ',
1060) -> str:
1061    """
1062    Return a formatted string if list items separated by commas.
1063
1064    Parameters
1065    ----------
1066    items: [List[Any]]
1067        The items to be printed as an English list.
1068
1069    quotes: bool, default True
1070        If `True`, wrap items in quotes.
1071
1072    quote_str: str, default "'"
1073        If `quotes` is `True`, prepend and append each item with this string.
1074
1075    and_: bool, default True
1076        If `True`, include the word 'and' before the final item in the list.
1077
1078    and_str: str, default 'and'
1079        If `and_` is True, insert this string where 'and' normally would in and English list.
1080
1081    oxford_comma: bool, default True
1082        If `True`, include the Oxford Comma (comma before the final 'and').
1083        Only applies when `and_` is `True`.
1084
1085    spaces: bool, default True
1086        If `True`, separate items with `space_str`
1087
1088    space_str: str, default ' '
1089        If `spaces` is `True`, separate items with this string.
1090
1091    Returns
1092    -------
1093    A string of the items as an English list.
1094
1095    Examples
1096    --------
1097    >>> items_str([1,2,3])
1098    "'1', '2', and '3'"
1099    >>> items_str([1,2,3], quotes=False)
1100    '1, 2, and 3'
1101    >>> items_str([1,2,3], and_=False)
1102    "'1', '2', '3'"
1103    >>> items_str([1,2,3], spaces=False, and_=False)
1104    "'1','2','3'"
1105    >>> items_str([1,2,3], oxford_comma=False)
1106    "'1', '2' and '3'"
1107    >>> items_str([1,2,3], quote_str=":")
1108    ':1:, :2:, and :3:'
1109    >>> items_str([1,2,3], and_str="or")
1110    "'1', '2', or '3'"
1111    >>> items_str([1,2,3], space_str="_")
1112    "'1',_'2',_and_'3'"
1113
1114    """
1115    if not items:
1116        return ''
1117    
1118    q = quote_str if quotes else ''
1119    s = space_str if spaces else ''
1120    a = and_str if and_ else ''
1121    c = comma_str if commas else ''
1122
1123    if len(items) == 1:
1124        return q + str(list(items)[0]) + q
1125
1126    if len(items) == 2:
1127        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1128
1129    sep = q + c + s + q
1130    output = q + sep.join(str(i) for i in items[:-1]) + q
1131    if oxford_comma:
1132        output += c
1133    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1134    return output

Return a formatted string if list items separated by commas.

Parameters
  • items ([List[Any]]): The items to be printed as an English list.
  • quotes (bool, default True): If True, wrap items in quotes.
  • quote_str (str, default "'"): If quotes is True, prepend and append each item with this string.
  • and_ (bool, default True): If True, include the word 'and' before the final item in the list.
  • and_str (str, default 'and'): If and_ is True, insert this string where 'and' normally would in and English list.
  • oxford_comma (bool, default True): If True, include the Oxford Comma (comma before the final 'and'). Only applies when and_ is True.
  • spaces (bool, default True): If True, separate items with space_str
  • space_str (str, default ' '): If spaces is True, separate items with this string.
Returns
  • A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
def interval_str(delta: Union[datetime.timedelta, int], round_unit: bool = False) -> str:
1137def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str:
1138    """
1139    Return a human-readable string for a `timedelta` (or `int` minutes).
1140
1141    Parameters
1142    ----------
1143    delta: Union[timedelta, int]
1144        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1145
1146    round_unit: bool, default False
1147        If `True`, round the output to a single unit.
1148
1149    Returns
1150    -------
1151    A formatted string, fit for human eyes.
1152    """
1153    from meerschaum.utils.packages import attempt_import
1154    if is_int(str(delta)) and not round_unit:
1155        return str(delta)
1156
1157    humanfriendly = attempt_import('humanfriendly', lazy=False)
1158    delta_seconds = (
1159        delta.total_seconds()
1160        if hasattr(delta, 'total_seconds')
1161        else (delta * 60)
1162    )
1163
1164    is_negative = delta_seconds < 0
1165    delta_seconds = abs(delta_seconds)
1166    replace_units = {}
1167
1168    if round_unit:
1169        if delta_seconds < 1:
1170            delta_seconds = round(delta_seconds, 2)
1171        elif delta_seconds < 60:
1172            delta_seconds = int(delta_seconds)
1173        elif delta_seconds < 3600:
1174            delta_seconds = int(delta_seconds / 60) * 60
1175        elif delta_seconds < 86400:
1176            delta_seconds = int(delta_seconds / 3600) * 3600
1177        elif delta_seconds < (86400 * 7):
1178            delta_seconds = int(delta_seconds / 86400) * 86400
1179        elif delta_seconds < (86400 * 7 * 4):
1180            delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7)
1181        elif delta_seconds < (86400 * 7 * 4 * 13):
1182            delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7)
1183            replace_units['weeks'] = 'months'
1184        else:
1185            delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364)
1186
1187    delta_str = humanfriendly.format_timespan(delta_seconds)
1188    if ',' in delta_str and round_unit:
1189        delta_str = delta_str.split(',')[0]
1190    elif ' and ' in delta_str and round_unit:
1191        delta_str = delta_str.split(' and ')[0]
1192
1193    for parsed_unit, replacement_unit in replace_units.items():
1194        delta_str = delta_str.replace(parsed_unit, replacement_unit)
1195
1196    return delta_str + (' ago' if is_negative else '')

Return a human-readable string for a timedelta (or int minutes).

Parameters
  • delta (Union[timedelta, int]): The interval to print. If delta is an integer, assume it corresponds to minutes.
  • round_unit (bool, default False): If True, round the output to a single unit.
Returns
  • A formatted string, fit for human eyes.
def is_docker_available() -> bool:
1199def is_docker_available() -> bool:
1200    """Check if we can connect to the Docker engine."""
1201    import subprocess
1202    try:
1203        has_docker = subprocess.call(
1204            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1205        ) == 0
1206    except Exception:
1207        has_docker = False
1208    return has_docker

Check if we can connect to the Docker engine.

def is_android() -> bool:
1211def is_android() -> bool:
1212    """Return `True` if the current platform is Android."""
1213    import sys
1214    return hasattr(sys, 'getandroidapilevel')

Return True if the current platform is Android.

def is_bcp_available() -> bool:
1217def is_bcp_available() -> bool:
1218    """Check if the MSSQL `bcp` utility is installed."""
1219    import subprocess
1220
1221    try:
1222        has_bcp = subprocess.call(
1223            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1224        ) == 0
1225    except Exception:
1226        has_bcp = False
1227    return has_bcp

Check if the MSSQL bcp utility is installed.

def is_systemd_available() -> bool:
1230def is_systemd_available() -> bool:
1231    """Check if running on systemd."""
1232    import subprocess
1233    try:
1234        has_systemctl = subprocess.call(
1235            ['systemctl', 'whoami'],
1236            stdout=subprocess.DEVNULL,
1237            stderr=subprocess.STDOUT,
1238        ) == 0
1239    except FileNotFoundError:
1240        has_systemctl = False
1241    except Exception:
1242        import traceback
1243        traceback.print_exc()
1244        has_systemctl = False
1245    return has_systemctl

Check if running on systemd.

def is_tmux_available() -> bool:
1248def is_tmux_available() -> bool:
1249    """
1250    Check if `tmux` is installed.
1251    """
1252    import subprocess
1253    try:
1254        has_tmux = subprocess.call(
1255            ['tmux', '-V'],
1256            stdout=subprocess.DEVNULL,
1257            stderr=subprocess.STDOUT
1258        ) == 0
1259    except FileNotFoundError:
1260        has_tmux = False
1261    except Exception:
1262        has_tmux = False
1263    return has_tmux

Check if tmux is installed.

def get_last_n_lines(file_name: str, N: int):
1265def get_last_n_lines(file_name: str, N: int):
1266    """
1267    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1268    """
1269    # Create an empty list to keep the track of last N lines
1270    list_of_lines = []
1271    # Open file for reading in binary mode
1272    with open(file_name, 'rb') as read_obj:
1273        # Move the cursor to the end of the file
1274        read_obj.seek(0, os.SEEK_END)
1275        # Create a buffer to keep the last read line
1276        buffer = bytearray()
1277        # Get the current position of pointer i.e eof
1278        pointer_location = read_obj.tell()
1279        # Loop till pointer reaches the top of the file
1280        while pointer_location >= 0:
1281            # Move the file pointer to the location pointed by pointer_location
1282            read_obj.seek(pointer_location)
1283            # Shift pointer location by -1
1284            pointer_location = pointer_location -1
1285            # read that byte / character
1286            new_byte = read_obj.read(1)
1287            # If the read byte is new line character then it means one line is read
1288            if new_byte == b'\n':
1289                # Save the line in list of lines
1290                list_of_lines.append(buffer.decode()[::-1])
1291                # If the size of list reaches N, then return the reversed list
1292                if len(list_of_lines) == N:
1293                    return list(reversed(list_of_lines))
1294                # Reinitialize the byte array to save next line
1295                buffer = bytearray()
1296            else:
1297                # If last read character is not eol then add it in buffer
1298                buffer.extend(new_byte)
1299        # As file is read completely, if there is still data in buffer, then its first line.
1300        if len(buffer) > 0:
1301            list_of_lines.append(buffer.decode()[::-1])
1302    # return the reversed list
1303    return list(reversed(list_of_lines))
def tail(f, n, offset=None):
1306def tail(f, n, offset=None):
1307    """
1308    https://stackoverflow.com/a/692616/9699829
1309    
1310    Reads n lines from f with an offset of offset lines.  The return
1311    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1312    an indicator that is `True` if there are more lines in the file.
1313    """
1314    avg_line_length = 74
1315    to_read = n + (offset or 0)
1316
1317    while True:
1318        try:
1319            f.seek(-(avg_line_length * to_read), 2)
1320        except IOError:
1321            # woops.  apparently file is smaller than what we want
1322            # to step back, go to the beginning instead
1323            f.seek(0)
1324        pos = f.tell()
1325        lines = f.read().splitlines()
1326        if len(lines) >= to_read or pos == 0:
1327            return lines[-to_read:offset and -offset or None], \
1328                   len(lines) > to_read or pos > 0
1329        avg_line_length *= 1.3

https://stackoverflow.com/a/692616/9699829

Reads n lines from f with an offset of offset lines. The return value is a tuple in the form (lines, has_more) where has_more is an indicator that is True if there are more lines in the file.

def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1332def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1333    """
1334    Remove characters from each section of a string until the length is within the limit.
1335
1336    Parameters
1337    ----------
1338    item: str
1339        The item name to be truncated.
1340
1341    delimeter: str, default '_'
1342        Split `item` by this string into several sections.
1343
1344    max_len: int, default 128
1345        The max acceptable length of the truncated version of `item`.
1346
1347    Returns
1348    -------
1349    The truncated string.
1350
1351    Examples
1352    --------
1353    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1354    'ab_de_gh'
1355
1356    """
1357    if len(item) < max_len:
1358        return item
1359
1360    def _shorten(s: str) -> str:
1361        return s[:-1] if len(s) > 1 else s
1362
1363    sections = list(enumerate(item.split('_')))
1364    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1365    available_chars = max_len - len(sections)
1366
1367    _sections = [(i, s) for i, s in sorted_sections]
1368    _sections_len = sum([len(s) for i, s in _sections])
1369    _old_sections_len = _sections_len
1370    while _sections_len > available_chars:
1371        _sections = [(i, _shorten(s)) for i, s in _sections]
1372        _old_sections_len = _sections_len
1373        _sections_len = sum([len(s) for i, s in _sections])
1374        if _old_sections_len == _sections_len:
1375            raise Exception(f"String could not be truncated: '{item}'")
1376
1377    new_sections = sorted(_sections, key=lambda x: x[0])
1378    return delimeter.join([s for i, s in new_sections])

Remove characters from each section of a string until the length is within the limit.

Parameters
  • item (str): The item name to be truncated.
  • delimeter (str, default '_'): Split item by this string into several sections.
  • max_len (int, default 128): The max acceptable length of the truncated version of item.
Returns
  • The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
def truncate_text_for_display(text: str, max_length: int = 50, suffix: str = '…') -> str:
1381def truncate_text_for_display(
1382    text: str,
1383    max_length: int = 50,
1384    suffix: str = '…',
1385) -> str:
1386    """
1387    Truncate a potentially long string for display purposes.
1388
1389    Parameters
1390    ----------
1391    text: str
1392        The string to be truncated.
1393
1394    max_length: int, default 60
1395        The maximum length of `text` before truncation.
1396
1397    suffix: str, default '…'
1398        The string to append to the length of `text` to indicate truncation.
1399
1400    Returns
1401    -------
1402    A string of length `max_length` or less.
1403    """
1404    text_length = len(text)
1405    if text_length <= max_length:
1406        return text
1407
1408    suffix_length = len(suffix)
1409
1410    truncated_text = text[:max_length - suffix_length]
1411    return truncated_text + suffix

Truncate a potentially long string for display purposes.

Parameters
  • text (str): The string to be truncated.
  • max_length (int, default 60): The maximum length of text before truncation.
  • suffix (str, default '…'): The string to append to the length of text to indicate truncation.
Returns
  • A string of length max_length or less.
def separate_negation_values( vals: Union[List[str], Tuple[str]], negation_prefix: Optional[str] = None) -> Tuple[List[str], List[str]]:
1414def separate_negation_values(
1415    vals: Union[List[str], Tuple[str]],
1416    negation_prefix: Optional[str] = None,
1417) -> Tuple[List[str], List[str]]:
1418    """
1419    Separate the negated values from the positive ones.
1420    Return two lists: positive and negative values.
1421
1422    Parameters
1423    ----------
1424    vals: Union[List[str], Tuple[str]]
1425        A list of strings to parse.
1426
1427    negation_prefix: Optional[str], default None
1428        Include values that start with this string in the second list.
1429        If `None`, use the system default (`_`).
1430    """
1431    if negation_prefix is None:
1432        from meerschaum._internal.static import STATIC_CONFIG
1433        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1434    _in_vals, _ex_vals = [], []
1435    for v in vals:
1436        if str(v).startswith(negation_prefix):
1437            _ex_vals.append(str(v)[len(negation_prefix):])
1438        else:
1439            _in_vals.append(v)
1440
1441    return _in_vals, _ex_vals

Separate the negated values from the positive ones. Return two lists: positive and negative values.

Parameters
  • vals (Union[List[str], Tuple[str]]): A list of strings to parse.
  • negation_prefix (Optional[str], default None): Include values that start with this string in the second list. If None, use the system default (_).
def get_in_ex_params( params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1444def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1445    """
1446    Translate a params dictionary into lists of include- and exclude-values.
1447
1448    Parameters
1449    ----------
1450    params: Optional[Dict[str, Any]]
1451        A params query dictionary.
1452
1453    Returns
1454    -------
1455    A dictionary mapping keys to a tuple of lists for include and exclude values.
1456
1457    Examples
1458    --------
1459    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1460    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1461    """
1462    if not params:
1463        return {}
1464    return {
1465        col: separate_negation_values(
1466            (
1467                val
1468                if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype')
1469                else [val]
1470            )
1471        )
1472        for col, val in params.items()
1473    }

Translate a params dictionary into lists of include- and exclude-values.

Parameters
  • params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
  • A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
def flatten_list(list_: List[Any]) -> List[Any]:
1476def flatten_list(list_: List[Any]) -> List[Any]:
1477    """
1478    Recursively flatten a list.
1479    """
1480    for item in list_:
1481        if isinstance(item, list):
1482            yield from flatten_list(item)
1483        else:
1484            yield item

Recursively flatten a list.

def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1487def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1488    """
1489    Parse a string containing the text to be passed into a function
1490    and return a tuple of args, kwargs.
1491
1492    Parameters
1493    ----------
1494    args_str: str
1495        The contents of the function parameter (as a string).
1496
1497    Returns
1498    -------
1499    A tuple of args (tuple) and kwargs (dict[str, Any]).
1500
1501    Examples
1502    --------
1503    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1504    (123, 456), {'foo': 789, 'bar': 'baz'}
1505    """
1506    import ast
1507    args = []
1508    kwargs = {}
1509
1510    for part in args_str.split(','):
1511        if '=' in part:
1512            key, val = part.split('=', 1)
1513            kwargs[key.strip()] = ast.literal_eval(val)
1514        else:
1515            args.append(ast.literal_eval(part.strip()))
1516
1517    return tuple(args), kwargs

Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.

Parameters
  • args_str (str): The contents of the function parameter (as a string).
Returns
  • A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
def parametrized(dec):
1591def parametrized(dec):
1592    """
1593    A meta-decorator for allowing other decorator functions to have parameters.
1594
1595    https://stackoverflow.com/a/26151604/9699829
1596    """
1597    def layer(*args, **kwargs):
1598        def repl(f):
1599            return dec(f, *args, **kwargs)
1600        return repl
1601    return layer

A meta-decorator for allowing other decorator functions to have parameters.

https://stackoverflow.com/a/26151604/9699829

def safely_extract_tar(tarf: "'file'", output_dir: Union[str, pathlib.Path]) -> None:
1604def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1605    """
1606    Safely extract a TAR file to a give directory.
1607    This defends against CVE-2007-4559.
1608
1609    Parameters
1610    ----------
1611    tarf: file
1612        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1613
1614    output_dir: Union[str, pathlib.Path]
1615        The output directory.
1616    """
1617
1618    def is_within_directory(directory, target):
1619        abs_directory = os.path.abspath(directory)
1620        abs_target = os.path.abspath(target)
1621        prefix = os.path.commonprefix([abs_directory, abs_target])
1622        return prefix == abs_directory 
1623
1624    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1625        for member in tar.getmembers():
1626            member_path = os.path.join(path, member.name)
1627            if not is_within_directory(path, member_path):
1628                raise Exception("Attempted Path Traversal in Tar File")
1629
1630        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1631
1632    return safe_extract(tarf, output_dir)

Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.

Parameters
  • tarf (file): The TAR file opened with tarfile.open(path, 'r:gz').
  • output_dir (Union[str, pathlib.Path]): The output directory.
def to_snake_case(name: str) -> str:
1635def to_snake_case(name: str) -> str:
1636    """
1637    Return the given string in snake-case-style.
1638
1639    Parameters
1640    ----------
1641    name: str
1642        The input text to convert to snake case.
1643
1644    Returns
1645    -------
1646    A snake-case version of `name`.
1647
1648    Examples
1649    --------
1650    >>> to_snake_case("HelloWorld!")
1651    'hello_world'
1652    >>> to_snake_case("This has spaces in it.")
1653    'this_has_spaces_in_it'
1654    >>> to_snake_case("already_in_snake_case")
1655    'already_in_snake_case'
1656    """
1657    import re
1658    name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
1659    name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
1660    name = re.sub(r'[^\w\s]', '', name)
1661    name = re.sub(r'\s+', '_', name)
1662    return name.lower()

Return the given string in snake-case-style.

Parameters
  • name (str): The input text to convert to snake case.
Returns
  • A snake-case version of name.
Examples
>>> to_snake_case("HelloWorld!")
'hello_world'
>>> to_snake_case("This has spaces in it.")
'this_has_spaces_in_it'
>>> to_snake_case("already_in_snake_case")
'already_in_snake_case'
def get_directory_size(path: 'Path') -> int:
1665def get_directory_size(path: Path) -> int:
1666    """
1667    Return the cumulative size of a directory's files in bytes.
1668    https://stackoverflow.com/a/55659577/9699829
1669    """
1670    return sum(file.stat().st_size for file in path.rglob('*'))

Return the cumulative size of a directory's files in bytes. https://stackoverflow.com/a/55659577/9699829

def choose_subaction(*args, **kwargs) -> Any:
1677def choose_subaction(*args, **kwargs) -> Any:
1678    """
1679    Placeholder function to prevent breaking legacy behavior.
1680    See `meerschaum.actions.choose_subaction`.
1681    """
1682    from meerschaum.actions import choose_subaction as _choose_subactions
1683    return _choose_subactions(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.actions.choose_subaction.

def json_serialize_datetime(dt: datetime.datetime) -> Optional[str]:
1803def json_serialize_datetime(dt: datetime) -> Union[str, None]:
1804    """
1805    Serialize a datetime object into JSON (ISO format string).
1806
1807    Examples
1808    --------
1809    >>> import json
1810    >>> from datetime import datetime
1811    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
1812    '{"a": "2022-01-01T00:00:00Z"}'
1813
1814    """
1815    from meerschaum.utils.dtypes import serialize_datetime
1816    return serialize_datetime(dt)

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'