meerschaum.utils.misc

Miscellaneous functions go here

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4"""
   5Miscellaneous functions go here
   6"""
   7
   8from __future__ import annotations
   9
  10import sys
  11import functools
  12from datetime import timedelta, datetime
  13
  14from meerschaum.utils.typing import (
  15    Union,
  16    Any,
  17    Callable,
  18    Optional,
  19    List,
  20    Dict,
  21    SuccessTuple,
  22    Iterable,
  23    PipesDict,
  24    Tuple,
  25    TYPE_CHECKING,
  26)
  27if TYPE_CHECKING:
  28    import collections
  29
  30__pdoc__: Dict[str, bool] = {
  31    'to_pandas_dtype': False,
  32    'filter_unseen_df': False,
  33    'add_missing_cols_to_df': False,
  34    'parse_df_datetimes': False,
  35    'df_from_literal': False,
  36    'get_json_cols': False,
  37    'get_unhashable_cols': False,
  38    'enforce_dtypes': False,
  39    'get_datetime_bound_from_df': False,
  40    'df_is_chunk_generator': False,
  41    'choices_docstring': False,
  42    '_get_subaction_names': False,
  43    'is_pipe_registered': False,
  44    'replace_pipes_in_dict': False,
  45    'round_time': False,
  46}
  47
  48
  49def add_method_to_class(
  50    func: Callable[[Any], Any],
  51    class_def: 'Class',
  52    method_name: Optional[str] = None,
  53    keep_self: Optional[bool] = None,
  54) -> Callable[[Any], Any]:
  55    """
  56    Add function `func` to class `class_def`.
  57
  58    Parameters
  59    ----------
  60    func: Callable[[Any], Any]
  61        Function to be added as a method of the class
  62
  63    class_def: Class
  64        Class to be modified.
  65
  66    method_name: Optional[str], default None
  67        New name of the method. None will use func.__name__ (default).
  68
  69    Returns
  70    -------
  71    The modified function object.
  72
  73    """
  74    from functools import wraps
  75
  76    is_class = isinstance(class_def, type)
  77    
  78    @wraps(func)
  79    def wrapper(self, *args, **kw):
  80        return func(*args, **kw)
  81
  82    if method_name is None:
  83        method_name = func.__name__
  84
  85    setattr(class_def, method_name, (
  86            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
  87        )
  88    )
  89
  90    return func
  91
  92
  93def generate_password(length: int = 12) -> str:
  94    """
  95    Generate a secure password of given length.
  96
  97    Parameters
  98    ----------
  99    length: int, default 12
 100        The length of the password.
 101
 102    Returns
 103    -------
 104    A random password string.
 105    """
 106    import secrets
 107    import string
 108    return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))
 109 
 110
 111def is_int(s: str) -> bool:
 112    """
 113    Check if string is an int.
 114
 115    Parameters
 116    ----------
 117    s: str
 118        The string to be checked.
 119        
 120    Returns
 121    -------
 122    A bool indicating whether the string was able to be cast to an integer.
 123
 124    """
 125    try:
 126        return float(s).is_integer()
 127    except Exception:
 128        return False
 129
 130
 131def is_uuid(s: str) -> bool:
 132    """
 133    Check if a string is a valid UUID.
 134
 135    Parameters
 136    ----------
 137    s: str
 138        The string to be checked.
 139
 140    Returns
 141    -------
 142    A bool indicating whether the string is a valid UUID.
 143    """
 144    import uuid
 145    try:
 146        uuid.UUID(str(s))
 147        return True
 148    except Exception:
 149        return False
 150
 151
 152def string_to_dict(params_string: str) -> Dict[str, Any]:
 153    """
 154    Parse a string into a dictionary.
 155    
 156    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
 157
 158    Parameters
 159    ----------
 160    params_string: str
 161        The string to be parsed.
 162        
 163    Returns
 164    -------
 165    The parsed dictionary.
 166
 167    Examples
 168    --------
 169    >>> string_to_dict("a:1,b:2") 
 170    {'a': 1, 'b': 2}
 171    >>> string_to_dict('{"a": 1, "b": 2}')
 172    {'a': 1, 'b': 2}
 173
 174    """
 175    if not params_string:
 176        return {}
 177
 178    import json
 179
 180    ### Kind of a weird edge case.
 181    ### In the generated compose file, there is some weird escaping happening,
 182    ### so the string to be parsed starts and ends with a single quote.
 183    if (
 184        isinstance(params_string, str)
 185        and len(params_string) > 4
 186        and params_string[1] == "{"
 187        and params_string[-2] == "}"
 188    ):
 189        return json.loads(params_string[1:-1])
 190
 191    if str(params_string).startswith('{'):
 192        return json.loads(params_string)
 193
 194    import ast
 195    params_dict = {}
 196    
 197    items = []
 198    bracket_level = 0
 199    brace_level = 0
 200    current_item = ''
 201    in_quotes = False
 202    quote_char = ''
 203    
 204    i = 0
 205    while i < len(params_string):
 206        char = params_string[i]
 207        
 208        if in_quotes:
 209            if char == quote_char and (i == 0 or params_string[i-1] != '\\'):
 210                in_quotes = False
 211        else:
 212            if char in ('"', "'"):
 213                in_quotes = True
 214                quote_char = char
 215            elif char == '[':
 216                bracket_level += 1
 217            elif char == ']':
 218                bracket_level -= 1
 219            elif char == '{':
 220                brace_level += 1
 221            elif char == '}':
 222                brace_level -= 1
 223            elif char == ',' and bracket_level == 0 and brace_level == 0:
 224                items.append(current_item)
 225                current_item = ''
 226                i += 1
 227                continue
 228        
 229        current_item += char
 230        i += 1
 231        
 232    if current_item:
 233        items.append(current_item)
 234
 235    for param in items:
 236        param = param.strip()
 237        if not param:
 238            continue
 239
 240        _keys = param.split(":", maxsplit=1)
 241        if len(_keys) != 2:
 242            continue
 243
 244        keys = _keys[:-1]
 245        try:
 246            val = ast.literal_eval(_keys[-1])
 247        except Exception:
 248            val = str(_keys[-1])
 249
 250        c = params_dict
 251        for _k in keys[:-1]:
 252            try:
 253                k = ast.literal_eval(_k)
 254            except Exception:
 255                k = str(_k)
 256            if k not in c:
 257                c[k] = {}
 258            c = c[k]
 259
 260        c[keys[-1]] = val
 261
 262    return params_dict
 263
 264
 265def to_simple_dict(doc: Dict[str, Any]) -> str:
 266    """
 267    Serialize a document dictionary in simple-dict format.
 268    """
 269    import json
 270    import ast
 271    from meerschaum.utils.dtypes import json_serialize_value
 272
 273    def serialize_value(value):
 274        if isinstance(value, str):
 275            try:
 276                evaluated = ast.literal_eval(value)
 277                if not isinstance(evaluated, str):
 278                    return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
 279                return value
 280            except (ValueError, SyntaxError, TypeError, MemoryError):
 281                return value
 282        
 283        return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
 284
 285    return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items())
 286
 287
 288def parse_config_substitution(
 289    value: str,
 290    leading_key: str = 'MRSM',
 291    begin_key: str = '{',
 292    end_key: str = '}',
 293    delimeter: str = ':',
 294) -> List[Any]:
 295    """
 296    Parse Meerschaum substitution syntax
 297    E.g. MRSM{value1:value2} => ['value1', 'value2']
 298    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
 299    """
 300    if not value.beginswith(leading_key):
 301        return value
 302
 303    return leading_key[len(leading_key):][len():-1].split(delimeter)
 304
 305
 306def edit_file(
 307    path: Union['pathlib.Path', str],
 308    default_editor: str = 'pyvim',
 309    debug: bool = False
 310) -> bool:
 311    """
 312    Open a file for editing.
 313
 314    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
 315
 316    Parameters
 317    ----------
 318    path: Union[pathlib.Path, str]
 319        The path to the file to be edited.
 320
 321    default_editor: str, default 'pyvim'
 322        If `$EDITOR` is not set, use this instead.
 323        If `pyvim` is not installed, it will install it from PyPI.
 324
 325    debug: bool, default False
 326        Verbosity toggle.
 327
 328    Returns
 329    -------
 330    A bool indicating the file was successfully edited.
 331    """
 332    import os
 333    from subprocess import call
 334    from meerschaum.utils.debug import dprint
 335    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
 336    try:
 337        EDITOR = os.environ.get('EDITOR', default_editor)
 338        if debug:
 339            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
 340        rc = call([EDITOR, path])
 341    except Exception as e: ### can't open with default editors
 342        if debug:
 343            dprint(str(e))
 344            dprint('Failed to open file with system editor. Falling back to pyvim...')
 345        pyvim = attempt_import('pyvim', lazy=False)
 346        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
 347    return rc == 0
 348
 349
 350def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
 351    """
 352    Determine the columns and lines in the terminal.
 353    If they cannot be determined, return the default values (100 columns and 120 lines).
 354
 355    Parameters
 356    ----------
 357    default_cols: int, default 100
 358        If the columns cannot be determined, return this value.
 359
 360    default_lines: int, default 120
 361        If the lines cannot be determined, return this value.
 362
 363    Returns
 364    -------
 365    A tuple if integers for the columns and lines.
 366    """
 367    import os
 368    try:
 369        size = os.get_terminal_size()
 370        _cols, _lines = size.columns, size.lines
 371    except Exception:
 372        _cols, _lines = (
 373            int(os.environ.get('COLUMNS', str(default_cols))),
 374            int(os.environ.get('LINES', str(default_lines))),
 375        )
 376    return _cols, _lines
 377
 378
 379def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
 380    """
 381    Iterate over a list in chunks.
 382    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
 383
 384    Parameters
 385    ----------
 386    iterable: Iterable[Any]
 387        The iterable to iterate over in chunks.
 388        
 389    chunksize: int
 390        The size of chunks to iterate with.
 391        
 392    fillvalue: Optional[Any], default None
 393        If the chunks do not evenly divide into the iterable, pad the end with this value.
 394
 395    Returns
 396    -------
 397    A generator of tuples of size `chunksize`.
 398
 399    """
 400    from itertools import zip_longest
 401    args = [iter(iterable)] * chunksize
 402    return zip_longest(*args, fillvalue=fillvalue)
 403
 404def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
 405    """
 406    Sort a dictionary's values and return a new dictionary.
 407
 408    Parameters
 409    ----------
 410    d: Dict[Any, Any]
 411        The dictionary to be sorted.
 412
 413    Returns
 414    -------
 415    A sorted dictionary.
 416
 417    Examples
 418    --------
 419    >>> sorted_dict({'b': 1, 'a': 2})
 420    {'b': 1, 'a': 2}
 421    >>> sorted_dict({'b': 2, 'a': 1})
 422    {'a': 1, 'b': 2}
 423
 424    """
 425    try:
 426        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
 427    except Exception:
 428        return d
 429
 430def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
 431    """
 432    Convert the standard pipes dictionary into a list.
 433
 434    Parameters
 435    ----------
 436    pipes_dict: PipesDict
 437        The pipes dictionary to be flattened.
 438
 439    Returns
 440    -------
 441    A list of `Pipe` objects.
 442
 443    """
 444    pipes_list = []
 445    for ck in pipes_dict.values():
 446        for mk in ck.values():
 447            pipes_list += list(mk.values())
 448    return pipes_list
 449
 450
 451def timed_input(
 452    seconds: int = 10,
 453    timeout_message: str = "",
 454    prompt: str = "",
 455    icon: bool = False,
 456    **kw
 457) -> Union[str, None]:
 458    """
 459    Accept user input only for a brief period of time.
 460
 461    Parameters
 462    ----------
 463    seconds: int, default 10
 464        The number of seconds to wait.
 465
 466    timeout_message: str, default ''
 467        The message to print after the window has elapsed.
 468
 469    prompt: str, default ''
 470        The prompt to print during the window.
 471
 472    icon: bool, default False
 473        If `True`, print the configured input icon.
 474
 475
 476    Returns
 477    -------
 478    The input string entered by the user.
 479
 480    """
 481    import signal, time
 482
 483    class TimeoutExpired(Exception):
 484        """Raise this exception when the timeout is reached."""
 485
 486    def alarm_handler(signum, frame):
 487        raise TimeoutExpired
 488
 489    # set signal handler
 490    signal.signal(signal.SIGALRM, alarm_handler)
 491    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
 492
 493    try:
 494        return input(prompt)
 495    except TimeoutExpired:
 496        return None
 497    except (EOFError, RuntimeError):
 498        try:
 499            print(prompt)
 500            time.sleep(seconds)
 501        except TimeoutExpired:
 502            return None
 503    finally:
 504        signal.alarm(0) # cancel alarm
 505
 506
 507def enforce_gevent_monkey_patch():
 508    """
 509    Check if gevent monkey patching is enabled, and if not, then apply patching.
 510    """
 511    from meerschaum.utils.packages import attempt_import
 512    import socket
 513    gevent, gevent_socket, gevent_monkey = attempt_import(
 514        'gevent', 'gevent.socket', 'gevent.monkey'
 515    )
 516    if not socket.socket is gevent_socket.socket:
 517        gevent_monkey.patch_all()
 518
 519def is_valid_email(email: str) -> Union['re.Match', None]:
 520    """
 521    Check whether a string is a valid email.
 522
 523    Parameters
 524    ----------
 525    email: str
 526        The string to be examined.
 527        
 528    Returns
 529    -------
 530    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
 531
 532    Examples
 533    --------
 534    >>> is_valid_email('foo')
 535    >>> is_valid_email('foo@foo.com')
 536    <re.Match object; span=(0, 11), match='foo@foo.com'>
 537
 538    """
 539    import re
 540    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
 541    return re.search(regex, email)
 542
 543
 544def string_width(string: str, widest: bool = True) -> int:
 545    """
 546    Calculate the width of a string, either by its widest or last line.
 547
 548    Parameters
 549    ----------
 550    string: str:
 551        The string to be examined.
 552        
 553    widest: bool, default True
 554        No longer used because `widest` is always assumed to be true.
 555
 556    Returns
 557    -------
 558    An integer for the text's visual width.
 559
 560    Examples
 561    --------
 562    >>> string_width('a')
 563    1
 564    >>> string_width('a\\nbc\\nd')
 565    2
 566
 567    """
 568    def _widest():
 569        words = string.split('\n')
 570        max_length = 0
 571        for w in words:
 572            length = len(w)
 573            if length > max_length:
 574                max_length = length
 575        return max_length
 576
 577    return _widest()
 578
 579def _pyinstaller_traverse_dir(
 580    directory: str,
 581    ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
 582    include_dotfiles: bool = False
 583) -> list:
 584    """
 585    Recursively traverse a directory and return a list of its contents.
 586    """
 587    import os, pathlib
 588    paths = []
 589    _directory = pathlib.Path(directory)
 590
 591    def _found_pattern(name: str):
 592        for pattern in ignore_patterns:
 593            if pattern.replace('/', os.path.sep) in str(name):
 594                return True
 595        return False
 596
 597    for root, dirs, files in os.walk(_directory):
 598        _root = str(root)[len(str(_directory.parent)):]
 599        if _root.startswith(os.path.sep):
 600            _root = _root[len(os.path.sep):]
 601        if _root.startswith('.') and not include_dotfiles:
 602            continue
 603        ### ignore certain patterns
 604        if _found_pattern(_root):
 605            continue
 606
 607        for filename in files:
 608            if filename.startswith('.') and not include_dotfiles:
 609                continue
 610            path = os.path.join(root, filename)
 611            if _found_pattern(path):
 612                continue
 613
 614            _path = str(path)[len(str(_directory.parent)):]
 615            if _path.startswith(os.path.sep):
 616                _path = _path[len(os.path.sep):]
 617            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
 618
 619            paths.append((path, _path))
 620    return paths
 621
 622
 623def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any:
 624    """
 625    Get a value from a dictionary with a tuple of keys.
 626
 627    Parameters
 628    ----------
 629    d: Dict[Any, Any]
 630        The dictionary to search.
 631
 632    path: Tuple[Any, ...]
 633        The path of keys to traverse.
 634
 635    Returns
 636    -------
 637    The value from the end of the path.
 638    """
 639    return functools.reduce(lambda di, key: di[key], path, d)
 640
 641
 642def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None:
 643    """
 644    Set a value in a dictionary with a tuple of keys.
 645
 646    Parameters
 647    ----------
 648    d: Dict[Any, Any]
 649        The dictionary to search.
 650
 651    path: Tuple[Any, ...]
 652        The path of keys to traverse.
 653
 654    val: Any
 655        The value to set at the end of the path.
 656    """
 657    get_val_from_dict_path(d, path[:-1])[path[-1]] = val
 658
 659
 660def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
 661    """
 662    Recursively replace passwords in a dictionary.
 663
 664    Parameters
 665    ----------
 666    d: Dict[str, Any]
 667        The dictionary to search through.
 668
 669    replace_with: str, default '*'
 670        The string to replace each character of the password with.
 671
 672    Returns
 673    -------
 674    Another dictionary where values to the keys `'password'`
 675    are replaced with `replace_with` (`'*'`).
 676
 677    Examples
 678    --------
 679    >>> replace_password({'a': 1})
 680    {'a': 1}
 681    >>> replace_password({'password': '123'})
 682    {'password': '***'}
 683    >>> replace_password({'nested': {'password': '123'}})
 684    {'nested': {'password': '***'}}
 685    >>> replace_password({'password': '123'}, replace_with='!')
 686    {'password': '!!!'}
 687
 688    """
 689    import copy
 690    _d = copy.deepcopy(d)
 691    for k, v in d.items():
 692        if isinstance(v, dict):
 693            _d[k] = replace_password(v)
 694        elif 'password' in str(k).lower():
 695            _d[k] = ''.join([replace_with for char in str(v)])
 696        elif str(k).lower() == 'uri':
 697            from meerschaum.connectors.sql import SQLConnector
 698            try:
 699                uri_params = SQLConnector.parse_uri(v)
 700            except Exception:
 701                uri_params = None
 702            if not uri_params:
 703                continue
 704            if not 'username' in uri_params or not 'password' in uri_params:
 705                continue
 706            _d[k] = v.replace(
 707                uri_params['username'] + ':' + uri_params['password'],
 708                uri_params['username'] + ':' + ''.join(
 709                    [replace_with for char in str(uri_params['password'])]
 710                )
 711            )
 712    return _d
 713
 714
 715def filter_arguments(
 716    func: Callable[[Any], Any],
 717    *args: Any,
 718    **kwargs: Any
 719) -> Tuple[Tuple[Any], Dict[str, Any]]:
 720    """
 721    Filter out unsupported positional and keyword arguments.
 722
 723    Parameters
 724    ----------
 725    func: Callable[[Any], Any]
 726        The function to inspect.
 727
 728    *args: Any
 729        Positional arguments to filter and pass to `func`.
 730
 731    **kwargs
 732        Keyword arguments to filter and pass to `func`.
 733
 734    Returns
 735    -------
 736    The `args` and `kwargs` accepted by `func`.
 737    """
 738    args = filter_positionals(func, *args)
 739    kwargs = filter_keywords(func, **kwargs)
 740    return args, kwargs
 741
 742
 743def filter_keywords(
 744    func: Callable[[Any], Any],
 745    **kw: Any
 746) -> Dict[str, Any]:
 747    """
 748    Filter out unsupported keyword arguments.
 749
 750    Parameters
 751    ----------
 752    func: Callable[[Any], Any]
 753        The function to inspect.
 754
 755    **kw: Any
 756        The arguments to be filtered and passed into `func`.
 757
 758    Returns
 759    -------
 760    A dictionary of keyword arguments accepted by `func`.
 761
 762    Examples
 763    --------
 764    ```python
 765    >>> def foo(a=1, b=2):
 766    ...     return a * b
 767    >>> filter_keywords(foo, a=2, b=4, c=6)
 768    {'a': 2, 'b': 4}
 769    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
 770    8
 771    ```
 772
 773    """
 774    import inspect
 775    func_params = inspect.signature(func).parameters
 776    ### If the function has a **kw method, skip filtering.
 777    for param, _type in func_params.items():
 778        if '**' in str(_type):
 779            return kw
 780    return {k: v for k, v in kw.items() if k in func_params}
 781
 782
 783def filter_positionals(
 784    func: Callable[[Any], Any],
 785    *args: Any
 786) -> Tuple[Any]:
 787    """
 788    Filter out unsupported positional arguments.
 789
 790    Parameters
 791    ----------
 792    func: Callable[[Any], Any]
 793        The function to inspect.
 794
 795    *args: Any
 796        The arguments to be filtered and passed into `func`.
 797        NOTE: If the function signature expects more arguments than provided,
 798        the missing slots will be filled with `None`.
 799
 800    Returns
 801    -------
 802    A tuple of positional arguments accepted by `func`.
 803
 804    Examples
 805    --------
 806    ```python
 807    >>> def foo(a, b):
 808    ...     return a * b
 809    >>> filter_positionals(foo, 2, 4, 6)
 810    (2, 4)
 811    >>> foo(*filter_positionals(foo, 2, 4, 6))
 812    8
 813    ```
 814
 815    """
 816    import inspect
 817    from meerschaum.utils.warnings import warn
 818    func_params = inspect.signature(func).parameters
 819    acceptable_args: List[Any] = []
 820
 821    def _warn_invalids(_num_invalid):
 822        if _num_invalid > 0:
 823            warn(
 824                "Too few arguments were provided. "
 825                + f"{_num_invalid} argument"
 826                + ('s have ' if _num_invalid != 1 else " has ")
 827                + " been filled with `None`.",
 828            )
 829
 830    num_invalid: int = 0
 831    for i, (param, val) in enumerate(func_params.items()):
 832        if '=' in str(val) or '*' in str(val):
 833            _warn_invalids(num_invalid)
 834            return tuple(acceptable_args)
 835
 836        try:
 837            acceptable_args.append(args[i])
 838        except IndexError:
 839            acceptable_args.append(None)
 840            num_invalid += 1
 841
 842    _warn_invalids(num_invalid)
 843    return tuple(acceptable_args)
 844
 845
 846def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
 847    """
 848    Convert an ordered dict to a dict.
 849    Does not mutate the original OrderedDict.
 850    """
 851    from collections import OrderedDict
 852    _d = dict(od)
 853    for k, v in od.items():
 854        if isinstance(v, OrderedDict) or (
 855            issubclass(type(v), OrderedDict)
 856        ):
 857            _d[k] = dict_from_od(v)
 858    return _d
 859
 860
 861def remove_ansi(s: str) -> str:
 862    """
 863    Remove ANSI escape characters from a string.
 864
 865    Parameters
 866    ----------
 867    s: str:
 868        The string to be cleaned.
 869
 870    Returns
 871    -------
 872    A string with the ANSI characters removed.
 873
 874    Examples
 875    --------
 876    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
 877    'Hello, World!'
 878
 879    """
 880    import re
 881    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
 882
 883
 884def get_connector_labels(
 885    *types: str,
 886    search_term: str = '',
 887    ignore_exact_match = True,
 888    _additional_options: Optional[List[str]] = None,
 889) -> List[str]:
 890    """
 891    Read connector labels from the configuration dictionary.
 892
 893    Parameters
 894    ----------
 895    *types: str
 896        The connector types.
 897        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
 898
 899    search_term: str, default ''
 900        A filter on the connectors' labels.
 901
 902    ignore_exact_match: bool, default True
 903        If `True`, skip a connector if the search_term is an exact match.
 904
 905    Returns
 906    -------
 907    A list of the keys of defined connectors.
 908
 909    """
 910    from meerschaum.config import get_config
 911    connectors = get_config('meerschaum', 'connectors')
 912
 913    _types = list(types)
 914    if len(_types) == 0:
 915        _types = list(connectors.keys()) + ['plugin']
 916
 917    conns = []
 918    for t in _types:
 919        if t == 'plugin':
 920            from meerschaum.plugins import get_data_plugins
 921            conns += [
 922                f'{t}:' + plugin.module.__name__.split('.')[-1]
 923                for plugin in get_data_plugins()
 924            ]
 925            continue
 926        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
 927
 928    if _additional_options:
 929        conns += _additional_options
 930
 931    possibilities = [
 932        c
 933        for c in conns
 934        if c.startswith(search_term)
 935            and c != (
 936                search_term if ignore_exact_match else ''
 937            )
 938    ]
 939    return sorted(possibilities)
 940
 941
 942def wget(
 943    url: str,
 944    dest: Optional[Union[str, 'pathlib.Path']] = None,
 945    headers: Optional[Dict[str, Any]] = None,
 946    color: bool = True,
 947    debug: bool = False,
 948    **kw: Any
 949) -> 'pathlib.Path':
 950    """
 951    Mimic `wget` with `requests`.
 952
 953    Parameters
 954    ----------
 955    url: str
 956        The URL to the resource to be downloaded.
 957
 958    dest: Optional[Union[str, pathlib.Path]], default None
 959        The destination path of the downloaded file.
 960        If `None`, save to the current directory.
 961
 962    color: bool, default True
 963        If `debug` is `True`, print color output.
 964
 965    debug: bool, default False
 966        Verbosity toggle.
 967
 968    Returns
 969    -------
 970    The path to the downloaded file.
 971
 972    """
 973    from meerschaum.utils.warnings import warn, error
 974    from meerschaum.utils.debug import dprint
 975    import os, pathlib, re, urllib.request
 976    if headers is None:
 977        headers = {}
 978    request = urllib.request.Request(url, headers=headers)
 979    if not color:
 980        dprint = print
 981    if debug:
 982        dprint(f"Downloading from '{url}'...")
 983    try:
 984        response = urllib.request.urlopen(request)
 985    except Exception as e:
 986        import ssl
 987        ssl._create_default_https_context = ssl._create_unverified_context
 988        try:
 989            response = urllib.request.urlopen(request)
 990        except Exception as _e:
 991            print(_e)
 992            response = None
 993    if response is None or response.code != 200:
 994        error_msg = f"Failed to download from '{url}'."
 995        if color:
 996            error(error_msg)
 997        else:
 998            print(error_msg)
 999            import sys
1000            sys.exit(1)
1001
1002    d = response.headers.get('content-disposition', None)
1003    fname = (
1004        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1005        else url.split('/')[-1]
1006    )
1007
1008    if dest is None:
1009        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1010    elif isinstance(dest, str):
1011        dest = pathlib.Path(dest)
1012
1013    with open(dest, 'wb') as f:
1014        f.write(response.fp.read())
1015
1016    if debug:
1017        dprint(f"Downloaded file '{dest}'.")
1018
1019    return dest
1020
1021
1022def async_wrap(func):
1023    """
1024    Run a synchronous function as async.
1025    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1026    """
1027    import asyncio
1028    from functools import wraps, partial
1029
1030    @wraps(func)
1031    async def run(*args, loop=None, executor=None, **kwargs):
1032        if loop is None:
1033            loop = asyncio.get_event_loop()
1034        pfunc = partial(func, *args, **kwargs)
1035        return await loop.run_in_executor(executor, pfunc)
1036    return run
1037
1038
1039def debug_trace(browser: bool = True):
1040    """
1041    Open a web-based debugger to trace the execution of the program.
1042
1043    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1044    """
1045    from meerschaum.utils.debug import trace
1046    trace(browser=browser)
1047
1048
1049def items_str(
1050    items: List[Any],
1051    quotes: bool = True,
1052    quote_str: str = "'",
1053    commas: bool = True,
1054    comma_str: str = ',',
1055    and_: bool = True,
1056    and_str: str = 'and',
1057    oxford_comma: bool = True,
1058    spaces: bool = True,
1059    space_str = ' ',
1060) -> str:
1061    """
1062    Return a formatted string if list items separated by commas.
1063
1064    Parameters
1065    ----------
1066    items: [List[Any]]
1067        The items to be printed as an English list.
1068
1069    quotes: bool, default True
1070        If `True`, wrap items in quotes.
1071
1072    quote_str: str, default "'"
1073        If `quotes` is `True`, prepend and append each item with this string.
1074
1075    and_: bool, default True
1076        If `True`, include the word 'and' before the final item in the list.
1077
1078    and_str: str, default 'and'
1079        If `and_` is True, insert this string where 'and' normally would in and English list.
1080
1081    oxford_comma: bool, default True
1082        If `True`, include the Oxford Comma (comma before the final 'and').
1083        Only applies when `and_` is `True`.
1084
1085    spaces: bool, default True
1086        If `True`, separate items with `space_str`
1087
1088    space_str: str, default ' '
1089        If `spaces` is `True`, separate items with this string.
1090
1091    Returns
1092    -------
1093    A string of the items as an English list.
1094
1095    Examples
1096    --------
1097    >>> items_str([1,2,3])
1098    "'1', '2', and '3'"
1099    >>> items_str([1,2,3], quotes=False)
1100    '1, 2, and 3'
1101    >>> items_str([1,2,3], and_=False)
1102    "'1', '2', '3'"
1103    >>> items_str([1,2,3], spaces=False, and_=False)
1104    "'1','2','3'"
1105    >>> items_str([1,2,3], oxford_comma=False)
1106    "'1', '2' and '3'"
1107    >>> items_str([1,2,3], quote_str=":")
1108    ':1:, :2:, and :3:'
1109    >>> items_str([1,2,3], and_str="or")
1110    "'1', '2', or '3'"
1111    >>> items_str([1,2,3], space_str="_")
1112    "'1',_'2',_and_'3'"
1113
1114    """
1115    if not items:
1116        return ''
1117    
1118    q = quote_str if quotes else ''
1119    s = space_str if spaces else ''
1120    a = and_str if and_ else ''
1121    c = comma_str if commas else ''
1122
1123    if len(items) == 1:
1124        return q + str(list(items)[0]) + q
1125
1126    if len(items) == 2:
1127        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1128
1129    sep = q + c + s + q
1130    output = q + sep.join(str(i) for i in items[:-1]) + q
1131    if oxford_comma:
1132        output += c
1133    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1134    return output
1135
1136
1137def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str:
1138    """
1139    Return a human-readable string for a `timedelta` (or `int` minutes).
1140
1141    Parameters
1142    ----------
1143    delta: Union[timedelta, int]
1144        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1145
1146    round_unit: bool, default False
1147        If `True`, round the output to a single unit.
1148
1149    Returns
1150    -------
1151    A formatted string, fit for human eyes.
1152    """
1153    from meerschaum.utils.packages import attempt_import
1154    if is_int(str(delta)) and not round_unit:
1155        return str(delta)
1156
1157    humanfriendly = attempt_import('humanfriendly', lazy=False)
1158    delta_seconds = (
1159        delta.total_seconds()
1160        if hasattr(delta, 'total_seconds')
1161        else (delta * 60)
1162    )
1163
1164    is_negative = delta_seconds < 0
1165    delta_seconds = abs(delta_seconds)
1166    replace_units = {}
1167
1168    if round_unit:
1169        if delta_seconds < 1:
1170            delta_seconds = round(delta_seconds, 2)
1171        elif delta_seconds < 60:
1172            delta_seconds = int(delta_seconds)
1173        elif delta_seconds < 3600:
1174            delta_seconds = int(delta_seconds / 60) * 60
1175        elif delta_seconds < 86400:
1176            delta_seconds = int(delta_seconds / 3600) * 3600
1177        elif delta_seconds < (86400 * 7):
1178            delta_seconds = int(delta_seconds / 86400) * 86400
1179        elif delta_seconds < (86400 * 7 * 4):
1180            delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7)
1181        elif delta_seconds < (86400 * 7 * 4 * 13):
1182            delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7)
1183            replace_units['weeks'] = 'months'
1184        else:
1185            delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364)
1186
1187    delta_str = humanfriendly.format_timespan(delta_seconds)
1188    if ',' in delta_str and round_unit:
1189        delta_str = delta_str.split(',')[0]
1190    elif ' and ' in delta_str and round_unit:
1191        delta_str = delta_str.split(' and ')[0]
1192
1193    for parsed_unit, replacement_unit in replace_units.items():
1194        delta_str = delta_str.replace(parsed_unit, replacement_unit)
1195
1196    return delta_str + (' ago' if is_negative else '')
1197
1198
1199def is_docker_available() -> bool:
1200    """Check if we can connect to the Docker engine."""
1201    import subprocess
1202    try:
1203        has_docker = subprocess.call(
1204            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1205        ) == 0
1206    except Exception:
1207        has_docker = False
1208    return has_docker
1209
1210
1211def is_android() -> bool:
1212    """Return `True` if the current platform is Android."""
1213    import sys
1214    return hasattr(sys, 'getandroidapilevel')
1215
1216
1217def is_bcp_available() -> bool:
1218    """Check if the MSSQL `bcp` utility is installed."""
1219    import subprocess
1220
1221    try:
1222        has_bcp = subprocess.call(
1223            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1224        ) == 0
1225    except Exception:
1226        has_bcp = False
1227    return has_bcp
1228
1229
1230def is_systemd_available() -> bool:
1231    """Check if running on systemd."""
1232    import subprocess
1233    try:
1234        has_systemctl = subprocess.call(
1235            ['systemctl', 'whoami'],
1236            stdout=subprocess.DEVNULL,
1237            stderr=subprocess.STDOUT,
1238        ) == 0
1239    except FileNotFoundError:
1240        has_systemctl = False
1241    except Exception:
1242        import traceback
1243        traceback.print_exc()
1244        has_systemctl = False
1245    return has_systemctl
1246
1247
1248def is_tmux_available() -> bool:
1249    """
1250    Check if `tmux` is installed.
1251    """
1252    import subprocess
1253    try:
1254        has_tmux = subprocess.call(
1255            ['tmux', '-V'],
1256            stdout=subprocess.DEVNULL,
1257            stderr=subprocess.STDOUT
1258        ) == 0
1259    except FileNotFoundError:
1260        has_tmux = False
1261    except Exception:
1262        has_tmux = False
1263    return has_tmux
1264
1265def get_last_n_lines(file_name: str, N: int):
1266    """
1267    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1268    """
1269    import os
1270    # Create an empty list to keep the track of last N lines
1271    list_of_lines = []
1272    # Open file for reading in binary mode
1273    with open(file_name, 'rb') as read_obj:
1274        # Move the cursor to the end of the file
1275        read_obj.seek(0, os.SEEK_END)
1276        # Create a buffer to keep the last read line
1277        buffer = bytearray()
1278        # Get the current position of pointer i.e eof
1279        pointer_location = read_obj.tell()
1280        # Loop till pointer reaches the top of the file
1281        while pointer_location >= 0:
1282            # Move the file pointer to the location pointed by pointer_location
1283            read_obj.seek(pointer_location)
1284            # Shift pointer location by -1
1285            pointer_location = pointer_location -1
1286            # read that byte / character
1287            new_byte = read_obj.read(1)
1288            # If the read byte is new line character then it means one line is read
1289            if new_byte == b'\n':
1290                # Save the line in list of lines
1291                list_of_lines.append(buffer.decode()[::-1])
1292                # If the size of list reaches N, then return the reversed list
1293                if len(list_of_lines) == N:
1294                    return list(reversed(list_of_lines))
1295                # Reinitialize the byte array to save next line
1296                buffer = bytearray()
1297            else:
1298                # If last read character is not eol then add it in buffer
1299                buffer.extend(new_byte)
1300        # As file is read completely, if there is still data in buffer, then its first line.
1301        if len(buffer) > 0:
1302            list_of_lines.append(buffer.decode()[::-1])
1303    # return the reversed list
1304    return list(reversed(list_of_lines))
1305
1306
1307def tail(f, n, offset=None):
1308    """
1309    https://stackoverflow.com/a/692616/9699829
1310    
1311    Reads n lines from f with an offset of offset lines.  The return
1312    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1313    an indicator that is `True` if there are more lines in the file.
1314    """
1315    avg_line_length = 74
1316    to_read = n + (offset or 0)
1317
1318    while True:
1319        try:
1320            f.seek(-(avg_line_length * to_read), 2)
1321        except IOError:
1322            # woops.  apparently file is smaller than what we want
1323            # to step back, go to the beginning instead
1324            f.seek(0)
1325        pos = f.tell()
1326        lines = f.read().splitlines()
1327        if len(lines) >= to_read or pos == 0:
1328            return lines[-to_read:offset and -offset or None], \
1329                   len(lines) > to_read or pos > 0
1330        avg_line_length *= 1.3
1331
1332
1333def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1334    """
1335    Remove characters from each section of a string until the length is within the limit.
1336
1337    Parameters
1338    ----------
1339    item: str
1340        The item name to be truncated.
1341
1342    delimeter: str, default '_'
1343        Split `item` by this string into several sections.
1344
1345    max_len: int, default 128
1346        The max acceptable length of the truncated version of `item`.
1347
1348    Returns
1349    -------
1350    The truncated string.
1351
1352    Examples
1353    --------
1354    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1355    'ab_de_gh'
1356
1357    """
1358    if len(item) < max_len:
1359        return item
1360
1361    def _shorten(s: str) -> str:
1362        return s[:-1] if len(s) > 1 else s
1363
1364    sections = list(enumerate(item.split('_')))
1365    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1366    available_chars = max_len - len(sections)
1367
1368    _sections = [(i, s) for i, s in sorted_sections]
1369    _sections_len = sum([len(s) for i, s in _sections])
1370    _old_sections_len = _sections_len
1371    while _sections_len > available_chars:
1372        _sections = [(i, _shorten(s)) for i, s in _sections]
1373        _old_sections_len = _sections_len
1374        _sections_len = sum([len(s) for i, s in _sections])
1375        if _old_sections_len == _sections_len:
1376            raise Exception(f"String could not be truncated: '{item}'")
1377
1378    new_sections = sorted(_sections, key=lambda x: x[0])
1379    return delimeter.join([s for i, s in new_sections])
1380
1381
1382def truncate_text_for_display(
1383    text: str,
1384    max_length: int = 50,
1385    suffix: str = '…',
1386) -> str:
1387    """
1388    Truncate a potentially long string for display purposes.
1389
1390    Parameters
1391    ----------
1392    text: str
1393        The string to be truncated.
1394
1395    max_length: int, default 60
1396        The maximum length of `text` before truncation.
1397
1398    suffix: str, default '…'
1399        The string to append to the length of `text` to indicate truncation.
1400
1401    Returns
1402    -------
1403    A string of length `max_length` or less.
1404    """
1405    text_length = len(text)
1406    if text_length <= max_length:
1407        return text
1408
1409    suffix_length = len(suffix)
1410
1411    truncated_text = text[:max_length - suffix_length]
1412    return truncated_text + suffix
1413
1414
1415def separate_negation_values(
1416    vals: Union[List[str], Tuple[str]],
1417    negation_prefix: Optional[str] = None,
1418) -> Tuple[List[str], List[str]]:
1419    """
1420    Separate the negated values from the positive ones.
1421    Return two lists: positive and negative values.
1422
1423    Parameters
1424    ----------
1425    vals: Union[List[str], Tuple[str]]
1426        A list of strings to parse.
1427
1428    negation_prefix: Optional[str], default None
1429        Include values that start with this string in the second list.
1430        If `None`, use the system default (`_`).
1431    """
1432    if negation_prefix is None:
1433        from meerschaum._internal.static import STATIC_CONFIG
1434        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1435    _in_vals, _ex_vals = [], []
1436    for v in vals:
1437        if str(v).startswith(negation_prefix):
1438            _ex_vals.append(str(v)[len(negation_prefix):])
1439        else:
1440            _in_vals.append(v)
1441
1442    return _in_vals, _ex_vals
1443
1444
1445def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1446    """
1447    Translate a params dictionary into lists of include- and exclude-values.
1448
1449    Parameters
1450    ----------
1451    params: Optional[Dict[str, Any]]
1452        A params query dictionary.
1453
1454    Returns
1455    -------
1456    A dictionary mapping keys to a tuple of lists for include and exclude values.
1457
1458    Examples
1459    --------
1460    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1461    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1462    """
1463    if not params:
1464        return {}
1465    return {
1466        col: separate_negation_values(
1467            (
1468                val
1469                if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype')
1470                else [val]
1471            )
1472        )
1473        for col, val in params.items()
1474    }
1475
1476
1477def flatten_list(list_: List[Any]) -> List[Any]:
1478    """
1479    Recursively flatten a list.
1480    """
1481    for item in list_:
1482        if isinstance(item, list):
1483            yield from flatten_list(item)
1484        else:
1485            yield item
1486
1487
1488def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1489    """
1490    Parse a string containing the text to be passed into a function
1491    and return a tuple of args, kwargs.
1492
1493    Parameters
1494    ----------
1495    args_str: str
1496        The contents of the function parameter (as a string).
1497
1498    Returns
1499    -------
1500    A tuple of args (tuple) and kwargs (dict[str, Any]).
1501
1502    Examples
1503    --------
1504    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1505    (123, 456), {'foo': 789, 'bar': 'baz'}
1506    """
1507    import ast
1508    args = []
1509    kwargs = {}
1510
1511    for part in args_str.split(','):
1512        if '=' in part:
1513            key, val = part.split('=', 1)
1514            kwargs[key.strip()] = ast.literal_eval(val)
1515        else:
1516            args.append(ast.literal_eval(part.strip()))
1517
1518    return tuple(args), kwargs
1519
1520
1521def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple:
1522    """
1523    Wrap around `pathlib.Path.symlink_to`, but add support for Windows.
1524
1525    Parameters
1526    ----------
1527    src_path: pathlib.Path
1528        The source path.
1529
1530    dest_path: pathlib.Path
1531        The destination path.
1532
1533    Returns
1534    -------
1535    A SuccessTuple indicating success.
1536    """
1537    if dest_path.exists() and dest_path.resolve() == src_path.resolve():
1538        return True, "Symlink already exists."
1539    try:
1540        dest_path.symlink_to(src_path)
1541        success = True
1542    except Exception as e:
1543        success = False
1544        msg = str(e)
1545    if success:
1546        return success, "Success"
1547
1548    ### Failed to create a symlink.
1549    ### If we're not on Windows, return an error.
1550    import platform
1551    if platform.system() != 'Windows':
1552        return success, msg
1553
1554    try:
1555        import _winapi
1556    except ImportError:
1557        return False, "Unable to import _winapi."
1558
1559    if src_path.is_dir():
1560        try:
1561            _winapi.CreateJunction(str(src_path), str(dest_path))
1562        except Exception as e:
1563            return False, str(e)
1564        return True, "Success"
1565
1566    ### Last resort: copy the file on Windows.
1567    import shutil
1568    try:
1569        shutil.copy(src_path, dest_path)
1570    except Exception as e:
1571        return False, str(e)
1572
1573    return True, "Success"
1574
1575
1576def is_symlink(path: pathlib.Path) -> bool:
1577    """
1578    Wrap `path.is_symlink()` but add support for Windows junctions.
1579    """
1580    if path.is_symlink():
1581        return True
1582    import platform
1583    import os
1584    if platform.system() != 'Windows':
1585        return False
1586    try:
1587        return bool(os.readlink(path))
1588    except OSError:
1589        return False
1590
1591
1592def parametrized(dec):
1593    """
1594    A meta-decorator for allowing other decorator functions to have parameters.
1595
1596    https://stackoverflow.com/a/26151604/9699829
1597    """
1598    def layer(*args, **kwargs):
1599        def repl(f):
1600            return dec(f, *args, **kwargs)
1601        return repl
1602    return layer
1603
1604
1605def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1606    """
1607    Safely extract a TAR file to a give directory.
1608    This defends against CVE-2007-4559.
1609
1610    Parameters
1611    ----------
1612    tarf: file
1613        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1614
1615    output_dir: Union[str, pathlib.Path]
1616        The output directory.
1617    """
1618    import os
1619
1620    def is_within_directory(directory, target):
1621        abs_directory = os.path.abspath(directory)
1622        abs_target = os.path.abspath(target)
1623        prefix = os.path.commonprefix([abs_directory, abs_target])
1624        return prefix == abs_directory 
1625
1626    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1627        for member in tar.getmembers():
1628            member_path = os.path.join(path, member.name)
1629            if not is_within_directory(path, member_path):
1630                raise Exception("Attempted Path Traversal in Tar File")
1631
1632        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1633
1634    return safe_extract(tarf, output_dir)
1635
1636
1637def to_snake_case(name: str) -> str:
1638    """
1639    Return the given string in snake-case-style.
1640
1641    Parameters
1642    ----------
1643    name: str
1644        The input text to convert to snake case.
1645
1646    Returns
1647    -------
1648    A snake-case version of `name`.
1649
1650    Examples
1651    --------
1652    >>> to_snake_case("HelloWorld!")
1653    'hello_world'
1654    >>> to_snake_case("This has spaces in it.")
1655    'this_has_spaces_in_it'
1656    >>> to_snake_case("already_in_snake_case")
1657    'already_in_snake_case'
1658    """
1659    import re
1660    name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
1661    name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
1662    name = re.sub(r'[^\w\s]', '', name)
1663    name = re.sub(r'\s+', '_', name)
1664    return name.lower()
1665
1666
1667##################
1668# Legacy imports #
1669##################
1670
1671def choose_subaction(*args, **kwargs) -> Any:
1672    """
1673    Placeholder function to prevent breaking legacy behavior.
1674    See `meerschaum.actions.choose_subaction`.
1675    """
1676    from meerschaum.actions import choose_subaction as _choose_subactions
1677    return _choose_subactions(*args, **kwargs)
1678
1679
1680def print_options(*args, **kwargs) -> None:
1681    """
1682    Placeholder function to prevent breaking legacy behavior.
1683    See `meerschaum.utils.formatting.print_options`.
1684    """
1685    from meerschaum.utils.formatting import print_options as _print_options
1686    return _print_options(*args, **kwargs)
1687
1688
1689def to_pandas_dtype(*args, **kwargs) -> Any:
1690    """
1691    Placeholder function to prevent breaking legacy behavior.
1692    See `meerschaum.utils.dtypes.to_pandas_dtype`.
1693    """
1694    from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype
1695    return _to_pandas_dtype(*args, **kwargs)
1696
1697
1698def filter_unseen_df(*args, **kwargs) -> Any:
1699    """
1700    Placeholder function to prevent breaking legacy behavior.
1701    See `meerschaum.utils.dataframe.filter_unseen_df`.
1702    """
1703    from meerschaum.utils.dataframe import filter_unseen_df as real_function
1704    return real_function(*args, **kwargs)
1705
1706
1707def add_missing_cols_to_df(*args, **kwargs) -> Any:
1708    """
1709    Placeholder function to prevent breaking legacy behavior.
1710    See `meerschaum.utils.dataframe.add_missing_cols_to_df`.
1711    """
1712    from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function
1713    return real_function(*args, **kwargs)
1714
1715
1716def parse_df_datetimes(*args, **kwargs) -> Any:
1717    """
1718    Placeholder function to prevent breaking legacy behavior.
1719    See `meerschaum.utils.dataframe.parse_df_datetimes`.
1720    """
1721    from meerschaum.utils.dataframe import parse_df_datetimes as real_function
1722    return real_function(*args, **kwargs)
1723
1724
1725def df_from_literal(*args, **kwargs) -> Any:
1726    """
1727    Placeholder function to prevent breaking legacy behavior.
1728    See `meerschaum.utils.dataframe.df_from_literal`.
1729    """
1730    from meerschaum.utils.dataframe import df_from_literal as real_function
1731    return real_function(*args, **kwargs)
1732
1733
1734def get_json_cols(*args, **kwargs) -> Any:
1735    """
1736    Placeholder function to prevent breaking legacy behavior.
1737    See `meerschaum.utils.dataframe.get_json_cols`.
1738    """
1739    from meerschaum.utils.dataframe import get_json_cols as real_function
1740    return real_function(*args, **kwargs)
1741
1742
1743def get_unhashable_cols(*args, **kwargs) -> Any:
1744    """
1745    Placeholder function to prevent breaking legacy behavior.
1746    See `meerschaum.utils.dataframe.get_unhashable_cols`.
1747    """
1748    from meerschaum.utils.dataframe import get_unhashable_cols as real_function
1749    return real_function(*args, **kwargs)
1750
1751
1752def enforce_dtypes(*args, **kwargs) -> Any:
1753    """
1754    Placeholder function to prevent breaking legacy behavior.
1755    See `meerschaum.utils.dataframe.enforce_dtypes`.
1756    """
1757    from meerschaum.utils.dataframe import enforce_dtypes as real_function
1758    return real_function(*args, **kwargs)
1759
1760
1761def get_datetime_bound_from_df(*args, **kwargs) -> Any:
1762    """
1763    Placeholder function to prevent breaking legacy behavior.
1764    See `meerschaum.utils.dataframe.get_datetime_bound_from_df`.
1765    """
1766    from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function
1767    return real_function(*args, **kwargs)
1768
1769
1770def df_is_chunk_generator(*args, **kwargs) -> Any:
1771    """
1772    Placeholder function to prevent breaking legacy behavior.
1773    See `meerschaum.utils.dataframe.df_is_chunk_generator`.
1774    """
1775    from meerschaum.utils.dataframe import df_is_chunk_generator as real_function
1776    return real_function(*args, **kwargs)
1777
1778
1779def choices_docstring(*args, **kwargs) -> Any:
1780    """
1781    Placeholder function to prevent breaking legacy behavior.
1782    See `meerschaum.actions.choices_docstring`.
1783    """
1784    from meerschaum.actions import choices_docstring as real_function
1785    return real_function(*args, **kwargs)
1786
1787
1788def _get_subaction_names(*args, **kwargs) -> Any:
1789    """
1790    Placeholder function to prevent breaking legacy behavior.
1791    See `meerschaum.actions._get_subaction_names`.
1792    """
1793    from meerschaum.actions import _get_subaction_names as real_function
1794    return real_function(*args, **kwargs)
1795
1796
1797def json_serialize_datetime(dt: datetime) -> Union[str, None]:
1798    """
1799    Serialize a datetime object into JSON (ISO format string).
1800
1801    Examples
1802    --------
1803    >>> import json
1804    >>> from datetime import datetime
1805    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
1806    '{"a": "2022-01-01T00:00:00Z"}'
1807
1808    """
1809    from meerschaum.utils.dtypes import serialize_datetime
1810    return serialize_datetime(dt)
1811
1812
1813def replace_pipes_in_dict(*args, **kwargs):
1814    """
1815    Placeholder function to prevent breaking legacy behavior.
1816    See `meerschaum.utils.pipes.replace_pipes_in_dict`.
1817    """
1818    from meerschaum.utils.pipes import replace_pipes_in_dict
1819    return replace_pipes_in_dict(*args, **kwargs)
1820
1821
1822def is_pipe_registered(*args, **kwargs):
1823    """
1824    Placeholder function to prevent breaking legacy behavior.
1825    See `meerschaum.utils.pipes.is_pipe_registered`.
1826    """
1827    from meerschaum.utils.pipes import is_pipe_registered
1828    return is_pipe_registered(*args, **kwargs)
1829
1830
1831def round_time(*args, **kwargs):
1832    """
1833    Placeholder function to prevent breaking legacy behavior.
1834    See `meerschaum.utils.dtypes.round_time`.
1835    """
1836    from meerschaum.utils.dtypes import round_time
1837    return round_time(*args, **kwargs)
1838
1839
1840_current_module = sys.modules[__name__]
1841__all__ = tuple(
1842    name
1843    for name, obj in globals().items()
1844    if callable(obj)
1845        and name not in __pdoc__
1846        and getattr(obj, '__module__', None) == _current_module.__name__
1847        and not name.startswith('_')
1848)
def add_method_to_class( func: Callable[[Any], Any], class_def: "'Class'", method_name: Optional[str] = None, keep_self: Optional[bool] = None) -> Callable[[Any], Any]:
50def add_method_to_class(
51    func: Callable[[Any], Any],
52    class_def: 'Class',
53    method_name: Optional[str] = None,
54    keep_self: Optional[bool] = None,
55) -> Callable[[Any], Any]:
56    """
57    Add function `func` to class `class_def`.
58
59    Parameters
60    ----------
61    func: Callable[[Any], Any]
62        Function to be added as a method of the class
63
64    class_def: Class
65        Class to be modified.
66
67    method_name: Optional[str], default None
68        New name of the method. None will use func.__name__ (default).
69
70    Returns
71    -------
72    The modified function object.
73
74    """
75    from functools import wraps
76
77    is_class = isinstance(class_def, type)
78    
79    @wraps(func)
80    def wrapper(self, *args, **kw):
81        return func(*args, **kw)
82
83    if method_name is None:
84        method_name = func.__name__
85
86    setattr(class_def, method_name, (
87            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
88        )
89    )
90
91    return func

Add function func to class class_def.

Parameters
  • func (Callable[[Any], Any]): Function to be added as a method of the class
  • class_def (Class): Class to be modified.
  • method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
  • The modified function object.
def generate_password(length: int = 12) -> str:
 94def generate_password(length: int = 12) -> str:
 95    """
 96    Generate a secure password of given length.
 97
 98    Parameters
 99    ----------
100    length: int, default 12
101        The length of the password.
102
103    Returns
104    -------
105    A random password string.
106    """
107    import secrets
108    import string
109    return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))

Generate a secure password of given length.

Parameters
  • length (int, default 12): The length of the password.
Returns
  • A random password string.
def is_int(s: str) -> bool:
112def is_int(s: str) -> bool:
113    """
114    Check if string is an int.
115
116    Parameters
117    ----------
118    s: str
119        The string to be checked.
120        
121    Returns
122    -------
123    A bool indicating whether the string was able to be cast to an integer.
124
125    """
126    try:
127        return float(s).is_integer()
128    except Exception:
129        return False

Check if string is an int.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string was able to be cast to an integer.
def is_uuid(s: str) -> bool:
132def is_uuid(s: str) -> bool:
133    """
134    Check if a string is a valid UUID.
135
136    Parameters
137    ----------
138    s: str
139        The string to be checked.
140
141    Returns
142    -------
143    A bool indicating whether the string is a valid UUID.
144    """
145    import uuid
146    try:
147        uuid.UUID(str(s))
148        return True
149    except Exception:
150        return False

Check if a string is a valid UUID.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string is a valid UUID.
def string_to_dict(params_string: str) -> Dict[str, Any]:
153def string_to_dict(params_string: str) -> Dict[str, Any]:
154    """
155    Parse a string into a dictionary.
156    
157    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
158
159    Parameters
160    ----------
161    params_string: str
162        The string to be parsed.
163        
164    Returns
165    -------
166    The parsed dictionary.
167
168    Examples
169    --------
170    >>> string_to_dict("a:1,b:2") 
171    {'a': 1, 'b': 2}
172    >>> string_to_dict('{"a": 1, "b": 2}')
173    {'a': 1, 'b': 2}
174
175    """
176    if not params_string:
177        return {}
178
179    import json
180
181    ### Kind of a weird edge case.
182    ### In the generated compose file, there is some weird escaping happening,
183    ### so the string to be parsed starts and ends with a single quote.
184    if (
185        isinstance(params_string, str)
186        and len(params_string) > 4
187        and params_string[1] == "{"
188        and params_string[-2] == "}"
189    ):
190        return json.loads(params_string[1:-1])
191
192    if str(params_string).startswith('{'):
193        return json.loads(params_string)
194
195    import ast
196    params_dict = {}
197    
198    items = []
199    bracket_level = 0
200    brace_level = 0
201    current_item = ''
202    in_quotes = False
203    quote_char = ''
204    
205    i = 0
206    while i < len(params_string):
207        char = params_string[i]
208        
209        if in_quotes:
210            if char == quote_char and (i == 0 or params_string[i-1] != '\\'):
211                in_quotes = False
212        else:
213            if char in ('"', "'"):
214                in_quotes = True
215                quote_char = char
216            elif char == '[':
217                bracket_level += 1
218            elif char == ']':
219                bracket_level -= 1
220            elif char == '{':
221                brace_level += 1
222            elif char == '}':
223                brace_level -= 1
224            elif char == ',' and bracket_level == 0 and brace_level == 0:
225                items.append(current_item)
226                current_item = ''
227                i += 1
228                continue
229        
230        current_item += char
231        i += 1
232        
233    if current_item:
234        items.append(current_item)
235
236    for param in items:
237        param = param.strip()
238        if not param:
239            continue
240
241        _keys = param.split(":", maxsplit=1)
242        if len(_keys) != 2:
243            continue
244
245        keys = _keys[:-1]
246        try:
247            val = ast.literal_eval(_keys[-1])
248        except Exception:
249            val = str(_keys[-1])
250
251        c = params_dict
252        for _k in keys[:-1]:
253            try:
254                k = ast.literal_eval(_k)
255            except Exception:
256                k = str(_k)
257            if k not in c:
258                c[k] = {}
259            c = c[k]
260
261        c[keys[-1]] = val
262
263    return params_dict

Parse a string into a dictionary.

If the string begins with '{', parse as JSON. Otherwise use simple parsing.

Parameters
  • params_string (str): The string to be parsed.
Returns
  • The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2") 
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
def to_simple_dict(doc: Dict[str, Any]) -> str:
266def to_simple_dict(doc: Dict[str, Any]) -> str:
267    """
268    Serialize a document dictionary in simple-dict format.
269    """
270    import json
271    import ast
272    from meerschaum.utils.dtypes import json_serialize_value
273
274    def serialize_value(value):
275        if isinstance(value, str):
276            try:
277                evaluated = ast.literal_eval(value)
278                if not isinstance(evaluated, str):
279                    return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
280                return value
281            except (ValueError, SyntaxError, TypeError, MemoryError):
282                return value
283        
284        return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
285
286    return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items())

Serialize a document dictionary in simple-dict format.

def parse_config_substitution( value: str, leading_key: str = 'MRSM', begin_key: str = '{', end_key: str = '}', delimeter: str = ':') -> List[Any]:
289def parse_config_substitution(
290    value: str,
291    leading_key: str = 'MRSM',
292    begin_key: str = '{',
293    end_key: str = '}',
294    delimeter: str = ':',
295) -> List[Any]:
296    """
297    Parse Meerschaum substitution syntax
298    E.g. MRSM{value1:value2} => ['value1', 'value2']
299    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
300    """
301    if not value.beginswith(leading_key):
302        return value
303
304    return leading_key[len(leading_key):][len():-1].split(delimeter)

Parse Meerschaum substitution syntax E.g. MRSM{value1:value2} => ['value1', 'value2'] NOTE: Not currently used. See search_and_substitute_config in meerschaum.config._read_yaml.

def edit_file( path: Union[pathlib.Path, str], default_editor: str = 'pyvim', debug: bool = False) -> bool:
307def edit_file(
308    path: Union['pathlib.Path', str],
309    default_editor: str = 'pyvim',
310    debug: bool = False
311) -> bool:
312    """
313    Open a file for editing.
314
315    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
316
317    Parameters
318    ----------
319    path: Union[pathlib.Path, str]
320        The path to the file to be edited.
321
322    default_editor: str, default 'pyvim'
323        If `$EDITOR` is not set, use this instead.
324        If `pyvim` is not installed, it will install it from PyPI.
325
326    debug: bool, default False
327        Verbosity toggle.
328
329    Returns
330    -------
331    A bool indicating the file was successfully edited.
332    """
333    import os
334    from subprocess import call
335    from meerschaum.utils.debug import dprint
336    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
337    try:
338        EDITOR = os.environ.get('EDITOR', default_editor)
339        if debug:
340            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
341        rc = call([EDITOR, path])
342    except Exception as e: ### can't open with default editors
343        if debug:
344            dprint(str(e))
345            dprint('Failed to open file with system editor. Falling back to pyvim...')
346        pyvim = attempt_import('pyvim', lazy=False)
347        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
348    return rc == 0

Open a file for editing.

Attempt to launch the user's defined $EDITOR, otherwise use pyvim.

Parameters
  • path (Union[pathlib.Path, str]): The path to the file to be edited.
  • default_editor (str, default 'pyvim'): If $EDITOR is not set, use this instead. If pyvim is not installed, it will install it from PyPI.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating the file was successfully edited.
def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
351def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
352    """
353    Determine the columns and lines in the terminal.
354    If they cannot be determined, return the default values (100 columns and 120 lines).
355
356    Parameters
357    ----------
358    default_cols: int, default 100
359        If the columns cannot be determined, return this value.
360
361    default_lines: int, default 120
362        If the lines cannot be determined, return this value.
363
364    Returns
365    -------
366    A tuple if integers for the columns and lines.
367    """
368    import os
369    try:
370        size = os.get_terminal_size()
371        _cols, _lines = size.columns, size.lines
372    except Exception:
373        _cols, _lines = (
374            int(os.environ.get('COLUMNS', str(default_cols))),
375            int(os.environ.get('LINES', str(default_lines))),
376        )
377    return _cols, _lines

Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).

Parameters
  • default_cols (int, default 100): If the columns cannot be determined, return this value.
  • default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
  • A tuple if integers for the columns and lines.
def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
380def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
381    """
382    Iterate over a list in chunks.
383    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
384
385    Parameters
386    ----------
387    iterable: Iterable[Any]
388        The iterable to iterate over in chunks.
389        
390    chunksize: int
391        The size of chunks to iterate with.
392        
393    fillvalue: Optional[Any], default None
394        If the chunks do not evenly divide into the iterable, pad the end with this value.
395
396    Returns
397    -------
398    A generator of tuples of size `chunksize`.
399
400    """
401    from itertools import zip_longest
402    args = [iter(iterable)] * chunksize
403    return zip_longest(*args, fillvalue=fillvalue)

Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks

Parameters
  • iterable (Iterable[Any]): The iterable to iterate over in chunks.
  • chunksize (int): The size of chunks to iterate with.
  • fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
  • A generator of tuples of size chunksize.
def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
405def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
406    """
407    Sort a dictionary's values and return a new dictionary.
408
409    Parameters
410    ----------
411    d: Dict[Any, Any]
412        The dictionary to be sorted.
413
414    Returns
415    -------
416    A sorted dictionary.
417
418    Examples
419    --------
420    >>> sorted_dict({'b': 1, 'a': 2})
421    {'b': 1, 'a': 2}
422    >>> sorted_dict({'b': 2, 'a': 1})
423    {'a': 1, 'b': 2}
424
425    """
426    try:
427        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
428    except Exception:
429        return d

Sort a dictionary's values and return a new dictionary.

Parameters
  • d (Dict[Any, Any]): The dictionary to be sorted.
Returns
  • A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
def flatten_pipes_dict( pipes_dict: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]) -> 'List[Pipe]':
431def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
432    """
433    Convert the standard pipes dictionary into a list.
434
435    Parameters
436    ----------
437    pipes_dict: PipesDict
438        The pipes dictionary to be flattened.
439
440    Returns
441    -------
442    A list of `Pipe` objects.
443
444    """
445    pipes_list = []
446    for ck in pipes_dict.values():
447        for mk in ck.values():
448            pipes_list += list(mk.values())
449    return pipes_list

Convert the standard pipes dictionary into a list.

Parameters
  • pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
  • A list of Pipe objects.
def timed_input( seconds: int = 10, timeout_message: str = '', prompt: str = '', icon: bool = False, **kw) -> Optional[str]:
452def timed_input(
453    seconds: int = 10,
454    timeout_message: str = "",
455    prompt: str = "",
456    icon: bool = False,
457    **kw
458) -> Union[str, None]:
459    """
460    Accept user input only for a brief period of time.
461
462    Parameters
463    ----------
464    seconds: int, default 10
465        The number of seconds to wait.
466
467    timeout_message: str, default ''
468        The message to print after the window has elapsed.
469
470    prompt: str, default ''
471        The prompt to print during the window.
472
473    icon: bool, default False
474        If `True`, print the configured input icon.
475
476
477    Returns
478    -------
479    The input string entered by the user.
480
481    """
482    import signal, time
483
484    class TimeoutExpired(Exception):
485        """Raise this exception when the timeout is reached."""
486
487    def alarm_handler(signum, frame):
488        raise TimeoutExpired
489
490    # set signal handler
491    signal.signal(signal.SIGALRM, alarm_handler)
492    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
493
494    try:
495        return input(prompt)
496    except TimeoutExpired:
497        return None
498    except (EOFError, RuntimeError):
499        try:
500            print(prompt)
501            time.sleep(seconds)
502        except TimeoutExpired:
503            return None
504    finally:
505        signal.alarm(0) # cancel alarm

Accept user input only for a brief period of time.

Parameters
  • seconds (int, default 10): The number of seconds to wait.
  • timeout_message (str, default ''): The message to print after the window has elapsed.
  • prompt (str, default ''): The prompt to print during the window.
  • icon (bool, default False): If True, print the configured input icon.
Returns
  • The input string entered by the user.
def enforce_gevent_monkey_patch():
508def enforce_gevent_monkey_patch():
509    """
510    Check if gevent monkey patching is enabled, and if not, then apply patching.
511    """
512    from meerschaum.utils.packages import attempt_import
513    import socket
514    gevent, gevent_socket, gevent_monkey = attempt_import(
515        'gevent', 'gevent.socket', 'gevent.monkey'
516    )
517    if not socket.socket is gevent_socket.socket:
518        gevent_monkey.patch_all()

Check if gevent monkey patching is enabled, and if not, then apply patching.

def is_valid_email(email: str) -> Optional[re.Match]:
520def is_valid_email(email: str) -> Union['re.Match', None]:
521    """
522    Check whether a string is a valid email.
523
524    Parameters
525    ----------
526    email: str
527        The string to be examined.
528        
529    Returns
530    -------
531    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
532
533    Examples
534    --------
535    >>> is_valid_email('foo')
536    >>> is_valid_email('foo@foo.com')
537    <re.Match object; span=(0, 11), match='foo@foo.com'>
538
539    """
540    import re
541    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
542    return re.search(regex, email)

Check whether a string is a valid email.

Parameters
  • email (str): The string to be examined.
Returns
  • None if a string is not in email format, otherwise a re.Match object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
def string_width(string: str, widest: bool = True) -> int:
545def string_width(string: str, widest: bool = True) -> int:
546    """
547    Calculate the width of a string, either by its widest or last line.
548
549    Parameters
550    ----------
551    string: str:
552        The string to be examined.
553        
554    widest: bool, default True
555        No longer used because `widest` is always assumed to be true.
556
557    Returns
558    -------
559    An integer for the text's visual width.
560
561    Examples
562    --------
563    >>> string_width('a')
564    1
565    >>> string_width('a\\nbc\\nd')
566    2
567
568    """
569    def _widest():
570        words = string.split('\n')
571        max_length = 0
572        for w in words:
573            length = len(w)
574            if length > max_length:
575                max_length = length
576        return max_length
577
578    return _widest()

Calculate the width of a string, either by its widest or last line.

Parameters
  • string (str:): The string to be examined.
  • widest (bool, default True): No longer used because widest is always assumed to be true.
Returns
  • An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any:
624def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any:
625    """
626    Get a value from a dictionary with a tuple of keys.
627
628    Parameters
629    ----------
630    d: Dict[Any, Any]
631        The dictionary to search.
632
633    path: Tuple[Any, ...]
634        The path of keys to traverse.
635
636    Returns
637    -------
638    The value from the end of the path.
639    """
640    return functools.reduce(lambda di, key: di[key], path, d)

Get a value from a dictionary with a tuple of keys.

Parameters
  • d (Dict[Any, Any]): The dictionary to search.
  • path (Tuple[Any, ...]): The path of keys to traverse.
Returns
  • The value from the end of the path.
def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None:
643def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None:
644    """
645    Set a value in a dictionary with a tuple of keys.
646
647    Parameters
648    ----------
649    d: Dict[Any, Any]
650        The dictionary to search.
651
652    path: Tuple[Any, ...]
653        The path of keys to traverse.
654
655    val: Any
656        The value to set at the end of the path.
657    """
658    get_val_from_dict_path(d, path[:-1])[path[-1]] = val

Set a value in a dictionary with a tuple of keys.

Parameters
  • d (Dict[Any, Any]): The dictionary to search.
  • path (Tuple[Any, ...]): The path of keys to traverse.
  • val (Any): The value to set at the end of the path.
def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
661def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
662    """
663    Recursively replace passwords in a dictionary.
664
665    Parameters
666    ----------
667    d: Dict[str, Any]
668        The dictionary to search through.
669
670    replace_with: str, default '*'
671        The string to replace each character of the password with.
672
673    Returns
674    -------
675    Another dictionary where values to the keys `'password'`
676    are replaced with `replace_with` (`'*'`).
677
678    Examples
679    --------
680    >>> replace_password({'a': 1})
681    {'a': 1}
682    >>> replace_password({'password': '123'})
683    {'password': '***'}
684    >>> replace_password({'nested': {'password': '123'}})
685    {'nested': {'password': '***'}}
686    >>> replace_password({'password': '123'}, replace_with='!')
687    {'password': '!!!'}
688
689    """
690    import copy
691    _d = copy.deepcopy(d)
692    for k, v in d.items():
693        if isinstance(v, dict):
694            _d[k] = replace_password(v)
695        elif 'password' in str(k).lower():
696            _d[k] = ''.join([replace_with for char in str(v)])
697        elif str(k).lower() == 'uri':
698            from meerschaum.connectors.sql import SQLConnector
699            try:
700                uri_params = SQLConnector.parse_uri(v)
701            except Exception:
702                uri_params = None
703            if not uri_params:
704                continue
705            if not 'username' in uri_params or not 'password' in uri_params:
706                continue
707            _d[k] = v.replace(
708                uri_params['username'] + ':' + uri_params['password'],
709                uri_params['username'] + ':' + ''.join(
710                    [replace_with for char in str(uri_params['password'])]
711                )
712            )
713    return _d

Recursively replace passwords in a dictionary.

Parameters
  • d (Dict[str, Any]): The dictionary to search through.
  • replace_with (str, default '*'): The string to replace each character of the password with.
Returns
  • Another dictionary where values to the keys 'password'
  • are replaced with replace_with ('*').
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
def filter_arguments( func: Callable[[Any], Any], *args: Any, **kwargs: Any) -> Tuple[Tuple[Any], Dict[str, Any]]:
716def filter_arguments(
717    func: Callable[[Any], Any],
718    *args: Any,
719    **kwargs: Any
720) -> Tuple[Tuple[Any], Dict[str, Any]]:
721    """
722    Filter out unsupported positional and keyword arguments.
723
724    Parameters
725    ----------
726    func: Callable[[Any], Any]
727        The function to inspect.
728
729    *args: Any
730        Positional arguments to filter and pass to `func`.
731
732    **kwargs
733        Keyword arguments to filter and pass to `func`.
734
735    Returns
736    -------
737    The `args` and `kwargs` accepted by `func`.
738    """
739    args = filter_positionals(func, *args)
740    kwargs = filter_keywords(func, **kwargs)
741    return args, kwargs

Filter out unsupported positional and keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): Positional arguments to filter and pass to func.
  • **kwargs: Keyword arguments to filter and pass to func.
Returns
  • The args and kwargs accepted by func.
def filter_keywords(func: Callable[[Any], Any], **kw: Any) -> Dict[str, Any]:
744def filter_keywords(
745    func: Callable[[Any], Any],
746    **kw: Any
747) -> Dict[str, Any]:
748    """
749    Filter out unsupported keyword arguments.
750
751    Parameters
752    ----------
753    func: Callable[[Any], Any]
754        The function to inspect.
755
756    **kw: Any
757        The arguments to be filtered and passed into `func`.
758
759    Returns
760    -------
761    A dictionary of keyword arguments accepted by `func`.
762
763    Examples
764    --------
765    ```python
766    >>> def foo(a=1, b=2):
767    ...     return a * b
768    >>> filter_keywords(foo, a=2, b=4, c=6)
769    {'a': 2, 'b': 4}
770    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
771    8
772    ```
773
774    """
775    import inspect
776    func_params = inspect.signature(func).parameters
777    ### If the function has a **kw method, skip filtering.
778    for param, _type in func_params.items():
779        if '**' in str(_type):
780            return kw
781    return {k: v for k, v in kw.items() if k in func_params}

Filter out unsupported keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • **kw (Any): The arguments to be filtered and passed into func.
Returns
  • A dictionary of keyword arguments accepted by func.
Examples
>>> def foo(a=1, b=2):
...     return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
def filter_positionals(func: Callable[[Any], Any], *args: Any) -> Tuple[Any]:
784def filter_positionals(
785    func: Callable[[Any], Any],
786    *args: Any
787) -> Tuple[Any]:
788    """
789    Filter out unsupported positional arguments.
790
791    Parameters
792    ----------
793    func: Callable[[Any], Any]
794        The function to inspect.
795
796    *args: Any
797        The arguments to be filtered and passed into `func`.
798        NOTE: If the function signature expects more arguments than provided,
799        the missing slots will be filled with `None`.
800
801    Returns
802    -------
803    A tuple of positional arguments accepted by `func`.
804
805    Examples
806    --------
807    ```python
808    >>> def foo(a, b):
809    ...     return a * b
810    >>> filter_positionals(foo, 2, 4, 6)
811    (2, 4)
812    >>> foo(*filter_positionals(foo, 2, 4, 6))
813    8
814    ```
815
816    """
817    import inspect
818    from meerschaum.utils.warnings import warn
819    func_params = inspect.signature(func).parameters
820    acceptable_args: List[Any] = []
821
822    def _warn_invalids(_num_invalid):
823        if _num_invalid > 0:
824            warn(
825                "Too few arguments were provided. "
826                + f"{_num_invalid} argument"
827                + ('s have ' if _num_invalid != 1 else " has ")
828                + " been filled with `None`.",
829            )
830
831    num_invalid: int = 0
832    for i, (param, val) in enumerate(func_params.items()):
833        if '=' in str(val) or '*' in str(val):
834            _warn_invalids(num_invalid)
835            return tuple(acceptable_args)
836
837        try:
838            acceptable_args.append(args[i])
839        except IndexError:
840            acceptable_args.append(None)
841            num_invalid += 1
842
843    _warn_invalids(num_invalid)
844    return tuple(acceptable_args)

Filter out unsupported positional arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): The arguments to be filtered and passed into func. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled with None.
Returns
  • A tuple of positional arguments accepted by func.
Examples
>>> def foo(a, b):
...     return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
847def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
848    """
849    Convert an ordered dict to a dict.
850    Does not mutate the original OrderedDict.
851    """
852    from collections import OrderedDict
853    _d = dict(od)
854    for k, v in od.items():
855        if isinstance(v, OrderedDict) or (
856            issubclass(type(v), OrderedDict)
857        ):
858            _d[k] = dict_from_od(v)
859    return _d

Convert an ordered dict to a dict. Does not mutate the original OrderedDict.

def remove_ansi(s: str) -> str:
862def remove_ansi(s: str) -> str:
863    """
864    Remove ANSI escape characters from a string.
865
866    Parameters
867    ----------
868    s: str:
869        The string to be cleaned.
870
871    Returns
872    -------
873    A string with the ANSI characters removed.
874
875    Examples
876    --------
877    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
878    'Hello, World!'
879
880    """
881    import re
882    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)

Remove ANSI escape characters from a string.

Parameters
  • s (str:): The string to be cleaned.
Returns
  • A string with the ANSI characters removed.
Examples
>>> remove_ansi("Hello, World!")
'Hello, World!'
def get_connector_labels( *types: str, search_term: str = '', ignore_exact_match=True, _additional_options: Optional[List[str]] = None) -> List[str]:
885def get_connector_labels(
886    *types: str,
887    search_term: str = '',
888    ignore_exact_match = True,
889    _additional_options: Optional[List[str]] = None,
890) -> List[str]:
891    """
892    Read connector labels from the configuration dictionary.
893
894    Parameters
895    ----------
896    *types: str
897        The connector types.
898        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
899
900    search_term: str, default ''
901        A filter on the connectors' labels.
902
903    ignore_exact_match: bool, default True
904        If `True`, skip a connector if the search_term is an exact match.
905
906    Returns
907    -------
908    A list of the keys of defined connectors.
909
910    """
911    from meerschaum.config import get_config
912    connectors = get_config('meerschaum', 'connectors')
913
914    _types = list(types)
915    if len(_types) == 0:
916        _types = list(connectors.keys()) + ['plugin']
917
918    conns = []
919    for t in _types:
920        if t == 'plugin':
921            from meerschaum.plugins import get_data_plugins
922            conns += [
923                f'{t}:' + plugin.module.__name__.split('.')[-1]
924                for plugin in get_data_plugins()
925            ]
926            continue
927        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
928
929    if _additional_options:
930        conns += _additional_options
931
932    possibilities = [
933        c
934        for c in conns
935        if c.startswith(search_term)
936            and c != (
937                search_term if ignore_exact_match else ''
938            )
939    ]
940    return sorted(possibilities)

Read connector labels from the configuration dictionary.

Parameters
  • *types (str): The connector types. If none are provided, use the defined types ('sql' and 'api') and 'plugin'.
  • search_term (str, default ''): A filter on the connectors' labels.
  • ignore_exact_match (bool, default True): If True, skip a connector if the search_term is an exact match.
Returns
  • A list of the keys of defined connectors.
def wget( url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, color: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
 943def wget(
 944    url: str,
 945    dest: Optional[Union[str, 'pathlib.Path']] = None,
 946    headers: Optional[Dict[str, Any]] = None,
 947    color: bool = True,
 948    debug: bool = False,
 949    **kw: Any
 950) -> 'pathlib.Path':
 951    """
 952    Mimic `wget` with `requests`.
 953
 954    Parameters
 955    ----------
 956    url: str
 957        The URL to the resource to be downloaded.
 958
 959    dest: Optional[Union[str, pathlib.Path]], default None
 960        The destination path of the downloaded file.
 961        If `None`, save to the current directory.
 962
 963    color: bool, default True
 964        If `debug` is `True`, print color output.
 965
 966    debug: bool, default False
 967        Verbosity toggle.
 968
 969    Returns
 970    -------
 971    The path to the downloaded file.
 972
 973    """
 974    from meerschaum.utils.warnings import warn, error
 975    from meerschaum.utils.debug import dprint
 976    import os, pathlib, re, urllib.request
 977    if headers is None:
 978        headers = {}
 979    request = urllib.request.Request(url, headers=headers)
 980    if not color:
 981        dprint = print
 982    if debug:
 983        dprint(f"Downloading from '{url}'...")
 984    try:
 985        response = urllib.request.urlopen(request)
 986    except Exception as e:
 987        import ssl
 988        ssl._create_default_https_context = ssl._create_unverified_context
 989        try:
 990            response = urllib.request.urlopen(request)
 991        except Exception as _e:
 992            print(_e)
 993            response = None
 994    if response is None or response.code != 200:
 995        error_msg = f"Failed to download from '{url}'."
 996        if color:
 997            error(error_msg)
 998        else:
 999            print(error_msg)
1000            import sys
1001            sys.exit(1)
1002
1003    d = response.headers.get('content-disposition', None)
1004    fname = (
1005        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1006        else url.split('/')[-1]
1007    )
1008
1009    if dest is None:
1010        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1011    elif isinstance(dest, str):
1012        dest = pathlib.Path(dest)
1013
1014    with open(dest, 'wb') as f:
1015        f.write(response.fp.read())
1016
1017    if debug:
1018        dprint(f"Downloaded file '{dest}'.")
1019
1020    return dest

Mimic wget with requests.

Parameters
  • url (str): The URL to the resource to be downloaded.
  • dest (Optional[Union[str, pathlib.Path]], default None): The destination path of the downloaded file. If None, save to the current directory.
  • color (bool, default True): If debug is True, print color output.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The path to the downloaded file.
def async_wrap(func):
1023def async_wrap(func):
1024    """
1025    Run a synchronous function as async.
1026    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1027    """
1028    import asyncio
1029    from functools import wraps, partial
1030
1031    @wraps(func)
1032    async def run(*args, loop=None, executor=None, **kwargs):
1033        if loop is None:
1034            loop = asyncio.get_event_loop()
1035        pfunc = partial(func, *args, **kwargs)
1036        return await loop.run_in_executor(executor, pfunc)
1037    return run
def debug_trace(browser: bool = True):
1040def debug_trace(browser: bool = True):
1041    """
1042    Open a web-based debugger to trace the execution of the program.
1043
1044    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1045    """
1046    from meerschaum.utils.debug import trace
1047    trace(browser=browser)

Open a web-based debugger to trace the execution of the program.

This is an alias import for meerschaum.utils.debug.debug_trace.

def items_str( items: List[Any], quotes: bool = True, quote_str: str = "'", commas: bool = True, comma_str: str = ',', and_: bool = True, and_str: str = 'and', oxford_comma: bool = True, spaces: bool = True, space_str=' ') -> str:
1050def items_str(
1051    items: List[Any],
1052    quotes: bool = True,
1053    quote_str: str = "'",
1054    commas: bool = True,
1055    comma_str: str = ',',
1056    and_: bool = True,
1057    and_str: str = 'and',
1058    oxford_comma: bool = True,
1059    spaces: bool = True,
1060    space_str = ' ',
1061) -> str:
1062    """
1063    Return a formatted string if list items separated by commas.
1064
1065    Parameters
1066    ----------
1067    items: [List[Any]]
1068        The items to be printed as an English list.
1069
1070    quotes: bool, default True
1071        If `True`, wrap items in quotes.
1072
1073    quote_str: str, default "'"
1074        If `quotes` is `True`, prepend and append each item with this string.
1075
1076    and_: bool, default True
1077        If `True`, include the word 'and' before the final item in the list.
1078
1079    and_str: str, default 'and'
1080        If `and_` is True, insert this string where 'and' normally would in and English list.
1081
1082    oxford_comma: bool, default True
1083        If `True`, include the Oxford Comma (comma before the final 'and').
1084        Only applies when `and_` is `True`.
1085
1086    spaces: bool, default True
1087        If `True`, separate items with `space_str`
1088
1089    space_str: str, default ' '
1090        If `spaces` is `True`, separate items with this string.
1091
1092    Returns
1093    -------
1094    A string of the items as an English list.
1095
1096    Examples
1097    --------
1098    >>> items_str([1,2,3])
1099    "'1', '2', and '3'"
1100    >>> items_str([1,2,3], quotes=False)
1101    '1, 2, and 3'
1102    >>> items_str([1,2,3], and_=False)
1103    "'1', '2', '3'"
1104    >>> items_str([1,2,3], spaces=False, and_=False)
1105    "'1','2','3'"
1106    >>> items_str([1,2,3], oxford_comma=False)
1107    "'1', '2' and '3'"
1108    >>> items_str([1,2,3], quote_str=":")
1109    ':1:, :2:, and :3:'
1110    >>> items_str([1,2,3], and_str="or")
1111    "'1', '2', or '3'"
1112    >>> items_str([1,2,3], space_str="_")
1113    "'1',_'2',_and_'3'"
1114
1115    """
1116    if not items:
1117        return ''
1118    
1119    q = quote_str if quotes else ''
1120    s = space_str if spaces else ''
1121    a = and_str if and_ else ''
1122    c = comma_str if commas else ''
1123
1124    if len(items) == 1:
1125        return q + str(list(items)[0]) + q
1126
1127    if len(items) == 2:
1128        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1129
1130    sep = q + c + s + q
1131    output = q + sep.join(str(i) for i in items[:-1]) + q
1132    if oxford_comma:
1133        output += c
1134    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1135    return output

Return a formatted string if list items separated by commas.

Parameters
  • items ([List[Any]]): The items to be printed as an English list.
  • quotes (bool, default True): If True, wrap items in quotes.
  • quote_str (str, default "'"): If quotes is True, prepend and append each item with this string.
  • and_ (bool, default True): If True, include the word 'and' before the final item in the list.
  • and_str (str, default 'and'): If and_ is True, insert this string where 'and' normally would in and English list.
  • oxford_comma (bool, default True): If True, include the Oxford Comma (comma before the final 'and'). Only applies when and_ is True.
  • spaces (bool, default True): If True, separate items with space_str
  • space_str (str, default ' '): If spaces is True, separate items with this string.
Returns
  • A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
def interval_str(delta: Union[datetime.timedelta, int], round_unit: bool = False) -> str:
1138def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str:
1139    """
1140    Return a human-readable string for a `timedelta` (or `int` minutes).
1141
1142    Parameters
1143    ----------
1144    delta: Union[timedelta, int]
1145        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1146
1147    round_unit: bool, default False
1148        If `True`, round the output to a single unit.
1149
1150    Returns
1151    -------
1152    A formatted string, fit for human eyes.
1153    """
1154    from meerschaum.utils.packages import attempt_import
1155    if is_int(str(delta)) and not round_unit:
1156        return str(delta)
1157
1158    humanfriendly = attempt_import('humanfriendly', lazy=False)
1159    delta_seconds = (
1160        delta.total_seconds()
1161        if hasattr(delta, 'total_seconds')
1162        else (delta * 60)
1163    )
1164
1165    is_negative = delta_seconds < 0
1166    delta_seconds = abs(delta_seconds)
1167    replace_units = {}
1168
1169    if round_unit:
1170        if delta_seconds < 1:
1171            delta_seconds = round(delta_seconds, 2)
1172        elif delta_seconds < 60:
1173            delta_seconds = int(delta_seconds)
1174        elif delta_seconds < 3600:
1175            delta_seconds = int(delta_seconds / 60) * 60
1176        elif delta_seconds < 86400:
1177            delta_seconds = int(delta_seconds / 3600) * 3600
1178        elif delta_seconds < (86400 * 7):
1179            delta_seconds = int(delta_seconds / 86400) * 86400
1180        elif delta_seconds < (86400 * 7 * 4):
1181            delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7)
1182        elif delta_seconds < (86400 * 7 * 4 * 13):
1183            delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7)
1184            replace_units['weeks'] = 'months'
1185        else:
1186            delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364)
1187
1188    delta_str = humanfriendly.format_timespan(delta_seconds)
1189    if ',' in delta_str and round_unit:
1190        delta_str = delta_str.split(',')[0]
1191    elif ' and ' in delta_str and round_unit:
1192        delta_str = delta_str.split(' and ')[0]
1193
1194    for parsed_unit, replacement_unit in replace_units.items():
1195        delta_str = delta_str.replace(parsed_unit, replacement_unit)
1196
1197    return delta_str + (' ago' if is_negative else '')

Return a human-readable string for a timedelta (or int minutes).

Parameters
  • delta (Union[timedelta, int]): The interval to print. If delta is an integer, assume it corresponds to minutes.
  • round_unit (bool, default False): If True, round the output to a single unit.
Returns
  • A formatted string, fit for human eyes.
def is_docker_available() -> bool:
1200def is_docker_available() -> bool:
1201    """Check if we can connect to the Docker engine."""
1202    import subprocess
1203    try:
1204        has_docker = subprocess.call(
1205            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1206        ) == 0
1207    except Exception:
1208        has_docker = False
1209    return has_docker

Check if we can connect to the Docker engine.

def is_android() -> bool:
1212def is_android() -> bool:
1213    """Return `True` if the current platform is Android."""
1214    import sys
1215    return hasattr(sys, 'getandroidapilevel')

Return True if the current platform is Android.

def is_bcp_available() -> bool:
1218def is_bcp_available() -> bool:
1219    """Check if the MSSQL `bcp` utility is installed."""
1220    import subprocess
1221
1222    try:
1223        has_bcp = subprocess.call(
1224            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1225        ) == 0
1226    except Exception:
1227        has_bcp = False
1228    return has_bcp

Check if the MSSQL bcp utility is installed.

def is_systemd_available() -> bool:
1231def is_systemd_available() -> bool:
1232    """Check if running on systemd."""
1233    import subprocess
1234    try:
1235        has_systemctl = subprocess.call(
1236            ['systemctl', 'whoami'],
1237            stdout=subprocess.DEVNULL,
1238            stderr=subprocess.STDOUT,
1239        ) == 0
1240    except FileNotFoundError:
1241        has_systemctl = False
1242    except Exception:
1243        import traceback
1244        traceback.print_exc()
1245        has_systemctl = False
1246    return has_systemctl

Check if running on systemd.

def is_tmux_available() -> bool:
1249def is_tmux_available() -> bool:
1250    """
1251    Check if `tmux` is installed.
1252    """
1253    import subprocess
1254    try:
1255        has_tmux = subprocess.call(
1256            ['tmux', '-V'],
1257            stdout=subprocess.DEVNULL,
1258            stderr=subprocess.STDOUT
1259        ) == 0
1260    except FileNotFoundError:
1261        has_tmux = False
1262    except Exception:
1263        has_tmux = False
1264    return has_tmux

Check if tmux is installed.

def get_last_n_lines(file_name: str, N: int):
1266def get_last_n_lines(file_name: str, N: int):
1267    """
1268    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1269    """
1270    import os
1271    # Create an empty list to keep the track of last N lines
1272    list_of_lines = []
1273    # Open file for reading in binary mode
1274    with open(file_name, 'rb') as read_obj:
1275        # Move the cursor to the end of the file
1276        read_obj.seek(0, os.SEEK_END)
1277        # Create a buffer to keep the last read line
1278        buffer = bytearray()
1279        # Get the current position of pointer i.e eof
1280        pointer_location = read_obj.tell()
1281        # Loop till pointer reaches the top of the file
1282        while pointer_location >= 0:
1283            # Move the file pointer to the location pointed by pointer_location
1284            read_obj.seek(pointer_location)
1285            # Shift pointer location by -1
1286            pointer_location = pointer_location -1
1287            # read that byte / character
1288            new_byte = read_obj.read(1)
1289            # If the read byte is new line character then it means one line is read
1290            if new_byte == b'\n':
1291                # Save the line in list of lines
1292                list_of_lines.append(buffer.decode()[::-1])
1293                # If the size of list reaches N, then return the reversed list
1294                if len(list_of_lines) == N:
1295                    return list(reversed(list_of_lines))
1296                # Reinitialize the byte array to save next line
1297                buffer = bytearray()
1298            else:
1299                # If last read character is not eol then add it in buffer
1300                buffer.extend(new_byte)
1301        # As file is read completely, if there is still data in buffer, then its first line.
1302        if len(buffer) > 0:
1303            list_of_lines.append(buffer.decode()[::-1])
1304    # return the reversed list
1305    return list(reversed(list_of_lines))
def tail(f, n, offset=None):
1308def tail(f, n, offset=None):
1309    """
1310    https://stackoverflow.com/a/692616/9699829
1311    
1312    Reads n lines from f with an offset of offset lines.  The return
1313    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1314    an indicator that is `True` if there are more lines in the file.
1315    """
1316    avg_line_length = 74
1317    to_read = n + (offset or 0)
1318
1319    while True:
1320        try:
1321            f.seek(-(avg_line_length * to_read), 2)
1322        except IOError:
1323            # woops.  apparently file is smaller than what we want
1324            # to step back, go to the beginning instead
1325            f.seek(0)
1326        pos = f.tell()
1327        lines = f.read().splitlines()
1328        if len(lines) >= to_read or pos == 0:
1329            return lines[-to_read:offset and -offset or None], \
1330                   len(lines) > to_read or pos > 0
1331        avg_line_length *= 1.3

https://stackoverflow.com/a/692616/9699829

Reads n lines from f with an offset of offset lines. The return value is a tuple in the form (lines, has_more) where has_more is an indicator that is True if there are more lines in the file.

def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1334def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1335    """
1336    Remove characters from each section of a string until the length is within the limit.
1337
1338    Parameters
1339    ----------
1340    item: str
1341        The item name to be truncated.
1342
1343    delimeter: str, default '_'
1344        Split `item` by this string into several sections.
1345
1346    max_len: int, default 128
1347        The max acceptable length of the truncated version of `item`.
1348
1349    Returns
1350    -------
1351    The truncated string.
1352
1353    Examples
1354    --------
1355    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1356    'ab_de_gh'
1357
1358    """
1359    if len(item) < max_len:
1360        return item
1361
1362    def _shorten(s: str) -> str:
1363        return s[:-1] if len(s) > 1 else s
1364
1365    sections = list(enumerate(item.split('_')))
1366    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1367    available_chars = max_len - len(sections)
1368
1369    _sections = [(i, s) for i, s in sorted_sections]
1370    _sections_len = sum([len(s) for i, s in _sections])
1371    _old_sections_len = _sections_len
1372    while _sections_len > available_chars:
1373        _sections = [(i, _shorten(s)) for i, s in _sections]
1374        _old_sections_len = _sections_len
1375        _sections_len = sum([len(s) for i, s in _sections])
1376        if _old_sections_len == _sections_len:
1377            raise Exception(f"String could not be truncated: '{item}'")
1378
1379    new_sections = sorted(_sections, key=lambda x: x[0])
1380    return delimeter.join([s for i, s in new_sections])

Remove characters from each section of a string until the length is within the limit.

Parameters
  • item (str): The item name to be truncated.
  • delimeter (str, default '_'): Split item by this string into several sections.
  • max_len (int, default 128): The max acceptable length of the truncated version of item.
Returns
  • The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
def truncate_text_for_display(text: str, max_length: int = 50, suffix: str = '…') -> str:
1383def truncate_text_for_display(
1384    text: str,
1385    max_length: int = 50,
1386    suffix: str = '…',
1387) -> str:
1388    """
1389    Truncate a potentially long string for display purposes.
1390
1391    Parameters
1392    ----------
1393    text: str
1394        The string to be truncated.
1395
1396    max_length: int, default 60
1397        The maximum length of `text` before truncation.
1398
1399    suffix: str, default '…'
1400        The string to append to the length of `text` to indicate truncation.
1401
1402    Returns
1403    -------
1404    A string of length `max_length` or less.
1405    """
1406    text_length = len(text)
1407    if text_length <= max_length:
1408        return text
1409
1410    suffix_length = len(suffix)
1411
1412    truncated_text = text[:max_length - suffix_length]
1413    return truncated_text + suffix

Truncate a potentially long string for display purposes.

Parameters
  • text (str): The string to be truncated.
  • max_length (int, default 60): The maximum length of text before truncation.
  • suffix (str, default '…'): The string to append to the length of text to indicate truncation.
Returns
  • A string of length max_length or less.
def separate_negation_values( vals: Union[List[str], Tuple[str]], negation_prefix: Optional[str] = None) -> Tuple[List[str], List[str]]:
1416def separate_negation_values(
1417    vals: Union[List[str], Tuple[str]],
1418    negation_prefix: Optional[str] = None,
1419) -> Tuple[List[str], List[str]]:
1420    """
1421    Separate the negated values from the positive ones.
1422    Return two lists: positive and negative values.
1423
1424    Parameters
1425    ----------
1426    vals: Union[List[str], Tuple[str]]
1427        A list of strings to parse.
1428
1429    negation_prefix: Optional[str], default None
1430        Include values that start with this string in the second list.
1431        If `None`, use the system default (`_`).
1432    """
1433    if negation_prefix is None:
1434        from meerschaum._internal.static import STATIC_CONFIG
1435        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1436    _in_vals, _ex_vals = [], []
1437    for v in vals:
1438        if str(v).startswith(negation_prefix):
1439            _ex_vals.append(str(v)[len(negation_prefix):])
1440        else:
1441            _in_vals.append(v)
1442
1443    return _in_vals, _ex_vals

Separate the negated values from the positive ones. Return two lists: positive and negative values.

Parameters
  • vals (Union[List[str], Tuple[str]]): A list of strings to parse.
  • negation_prefix (Optional[str], default None): Include values that start with this string in the second list. If None, use the system default (_).
def get_in_ex_params( params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1446def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1447    """
1448    Translate a params dictionary into lists of include- and exclude-values.
1449
1450    Parameters
1451    ----------
1452    params: Optional[Dict[str, Any]]
1453        A params query dictionary.
1454
1455    Returns
1456    -------
1457    A dictionary mapping keys to a tuple of lists for include and exclude values.
1458
1459    Examples
1460    --------
1461    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1462    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1463    """
1464    if not params:
1465        return {}
1466    return {
1467        col: separate_negation_values(
1468            (
1469                val
1470                if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype')
1471                else [val]
1472            )
1473        )
1474        for col, val in params.items()
1475    }

Translate a params dictionary into lists of include- and exclude-values.

Parameters
  • params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
  • A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
def flatten_list(list_: List[Any]) -> List[Any]:
1478def flatten_list(list_: List[Any]) -> List[Any]:
1479    """
1480    Recursively flatten a list.
1481    """
1482    for item in list_:
1483        if isinstance(item, list):
1484            yield from flatten_list(item)
1485        else:
1486            yield item

Recursively flatten a list.

def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1489def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1490    """
1491    Parse a string containing the text to be passed into a function
1492    and return a tuple of args, kwargs.
1493
1494    Parameters
1495    ----------
1496    args_str: str
1497        The contents of the function parameter (as a string).
1498
1499    Returns
1500    -------
1501    A tuple of args (tuple) and kwargs (dict[str, Any]).
1502
1503    Examples
1504    --------
1505    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1506    (123, 456), {'foo': 789, 'bar': 'baz'}
1507    """
1508    import ast
1509    args = []
1510    kwargs = {}
1511
1512    for part in args_str.split(','):
1513        if '=' in part:
1514            key, val = part.split('=', 1)
1515            kwargs[key.strip()] = ast.literal_eval(val)
1516        else:
1517            args.append(ast.literal_eval(part.strip()))
1518
1519    return tuple(args), kwargs

Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.

Parameters
  • args_str (str): The contents of the function parameter (as a string).
Returns
  • A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
def parametrized(dec):
1593def parametrized(dec):
1594    """
1595    A meta-decorator for allowing other decorator functions to have parameters.
1596
1597    https://stackoverflow.com/a/26151604/9699829
1598    """
1599    def layer(*args, **kwargs):
1600        def repl(f):
1601            return dec(f, *args, **kwargs)
1602        return repl
1603    return layer

A meta-decorator for allowing other decorator functions to have parameters.

https://stackoverflow.com/a/26151604/9699829

def safely_extract_tar(tarf: "'file'", output_dir: Union[str, pathlib.Path]) -> None:
1606def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1607    """
1608    Safely extract a TAR file to a give directory.
1609    This defends against CVE-2007-4559.
1610
1611    Parameters
1612    ----------
1613    tarf: file
1614        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1615
1616    output_dir: Union[str, pathlib.Path]
1617        The output directory.
1618    """
1619    import os
1620
1621    def is_within_directory(directory, target):
1622        abs_directory = os.path.abspath(directory)
1623        abs_target = os.path.abspath(target)
1624        prefix = os.path.commonprefix([abs_directory, abs_target])
1625        return prefix == abs_directory 
1626
1627    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1628        for member in tar.getmembers():
1629            member_path = os.path.join(path, member.name)
1630            if not is_within_directory(path, member_path):
1631                raise Exception("Attempted Path Traversal in Tar File")
1632
1633        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1634
1635    return safe_extract(tarf, output_dir)

Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.

Parameters
  • tarf (file): The TAR file opened with tarfile.open(path, 'r:gz').
  • output_dir (Union[str, pathlib.Path]): The output directory.
def to_snake_case(name: str) -> str:
1638def to_snake_case(name: str) -> str:
1639    """
1640    Return the given string in snake-case-style.
1641
1642    Parameters
1643    ----------
1644    name: str
1645        The input text to convert to snake case.
1646
1647    Returns
1648    -------
1649    A snake-case version of `name`.
1650
1651    Examples
1652    --------
1653    >>> to_snake_case("HelloWorld!")
1654    'hello_world'
1655    >>> to_snake_case("This has spaces in it.")
1656    'this_has_spaces_in_it'
1657    >>> to_snake_case("already_in_snake_case")
1658    'already_in_snake_case'
1659    """
1660    import re
1661    name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
1662    name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
1663    name = re.sub(r'[^\w\s]', '', name)
1664    name = re.sub(r'\s+', '_', name)
1665    return name.lower()

Return the given string in snake-case-style.

Parameters
  • name (str): The input text to convert to snake case.
Returns
  • A snake-case version of name.
Examples
>>> to_snake_case("HelloWorld!")
'hello_world'
>>> to_snake_case("This has spaces in it.")
'this_has_spaces_in_it'
>>> to_snake_case("already_in_snake_case")
'already_in_snake_case'
def choose_subaction(*args, **kwargs) -> Any:
1672def choose_subaction(*args, **kwargs) -> Any:
1673    """
1674    Placeholder function to prevent breaking legacy behavior.
1675    See `meerschaum.actions.choose_subaction`.
1676    """
1677    from meerschaum.actions import choose_subaction as _choose_subactions
1678    return _choose_subactions(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.actions.choose_subaction.

def json_serialize_datetime(dt: datetime.datetime) -> Optional[str]:
1798def json_serialize_datetime(dt: datetime) -> Union[str, None]:
1799    """
1800    Serialize a datetime object into JSON (ISO format string).
1801
1802    Examples
1803    --------
1804    >>> import json
1805    >>> from datetime import datetime
1806    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
1807    '{"a": "2022-01-01T00:00:00Z"}'
1808
1809    """
1810    from meerschaum.utils.dtypes import serialize_datetime
1811    return serialize_datetime(dt)

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'