meerschaum.utils.misc

Miscellaneous functions go here

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4"""
   5Miscellaneous functions go here
   6"""
   7
   8from __future__ import annotations
   9
  10import os
  11import sys
  12import functools
  13import pathlib
  14from datetime import timedelta, datetime
  15
  16from meerschaum.utils.typing import (
  17    Union,
  18    Any,
  19    Callable,
  20    Optional,
  21    List,
  22    Dict,
  23    SuccessTuple,
  24    Iterable,
  25    PipesDict,
  26    Tuple,
  27    TYPE_CHECKING,
  28)
  29if TYPE_CHECKING:
  30    import collections
  31
  32__pdoc__: Dict[str, bool] = {
  33    'to_pandas_dtype': False,
  34    'filter_unseen_df': False,
  35    'add_missing_cols_to_df': False,
  36    'parse_df_datetimes': False,
  37    'df_from_literal': False,
  38    'get_json_cols': False,
  39    'get_unhashable_cols': False,
  40    'enforce_dtypes': False,
  41    'get_datetime_bound_from_df': False,
  42    'df_is_chunk_generator': False,
  43    'choices_docstring': False,
  44    '_get_subaction_names': False,
  45    'is_pipe_registered': False,
  46    'replace_pipes_in_dict': False,
  47    'round_time': False,
  48}
  49
  50
  51def add_method_to_class(
  52    func: Callable[[Any], Any],
  53    class_def: 'Class',
  54    method_name: Optional[str] = None,
  55    keep_self: Optional[bool] = None,
  56) -> Callable[[Any], Any]:
  57    """
  58    Add function `func` to class `class_def`.
  59
  60    Parameters
  61    ----------
  62    func: Callable[[Any], Any]
  63        Function to be added as a method of the class
  64
  65    class_def: Class
  66        Class to be modified.
  67
  68    method_name: Optional[str], default None
  69        New name of the method. None will use func.__name__ (default).
  70
  71    Returns
  72    -------
  73    The modified function object.
  74
  75    """
  76    from functools import wraps
  77
  78    is_class = isinstance(class_def, type)
  79    
  80    @wraps(func)
  81    def wrapper(self, *args, **kw):
  82        return func(*args, **kw)
  83
  84    if method_name is None:
  85        method_name = func.__name__
  86
  87    setattr(class_def, method_name, (
  88            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
  89        )
  90    )
  91
  92    return func
  93
  94
  95def generate_password(length: int = 12) -> str:
  96    """
  97    Generate a secure password of given length.
  98
  99    Parameters
 100    ----------
 101    length: int, default 12
 102        The length of the password.
 103
 104    Returns
 105    -------
 106    A random password string.
 107    """
 108    import secrets
 109    import string
 110    return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))
 111 
 112
 113def is_int(s: str) -> bool:
 114    """
 115    Check if string is an int.
 116
 117    Parameters
 118    ----------
 119    s: str
 120        The string to be checked.
 121        
 122    Returns
 123    -------
 124    A bool indicating whether the string was able to be cast to an integer.
 125
 126    """
 127    try:
 128        return float(s).is_integer()
 129    except Exception:
 130        return False
 131
 132
 133def is_uuid(s: str) -> bool:
 134    """
 135    Check if a string is a valid UUID.
 136
 137    Parameters
 138    ----------
 139    s: str
 140        The string to be checked.
 141
 142    Returns
 143    -------
 144    A bool indicating whether the string is a valid UUID.
 145    """
 146    import uuid
 147    try:
 148        uuid.UUID(str(s))
 149        return True
 150    except Exception:
 151        return False
 152
 153
 154def string_to_dict(params_string: str) -> Dict[str, Any]:
 155    """
 156    Parse a string into a dictionary.
 157    
 158    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
 159
 160    Parameters
 161    ----------
 162    params_string: str
 163        The string to be parsed.
 164        
 165    Returns
 166    -------
 167    The parsed dictionary.
 168
 169    Examples
 170    --------
 171    >>> string_to_dict("a:1,b:2") 
 172    {'a': 1, 'b': 2}
 173    >>> string_to_dict('{"a": 1, "b": 2}')
 174    {'a': 1, 'b': 2}
 175
 176    """
 177    if not params_string:
 178        return {}
 179
 180    import json
 181
 182    ### Kind of a weird edge case.
 183    ### In the generated compose file, there is some weird escaping happening,
 184    ### so the string to be parsed starts and ends with a single quote.
 185    if (
 186        isinstance(params_string, str)
 187        and len(params_string) > 4
 188        and params_string[1] == "{"
 189        and params_string[-2] == "}"
 190    ):
 191        return json.loads(params_string[1:-1])
 192
 193    if str(params_string).startswith('{'):
 194        return json.loads(params_string)
 195
 196    import ast
 197    params_dict = {}
 198    
 199    items = []
 200    bracket_level = 0
 201    brace_level = 0
 202    current_item = ''
 203    in_quotes = False
 204    quote_char = ''
 205    
 206    i = 0
 207    while i < len(params_string):
 208        char = params_string[i]
 209        
 210        if in_quotes:
 211            if char == quote_char and (i == 0 or params_string[i-1] != '\\'):
 212                in_quotes = False
 213        else:
 214            if char in ('"', "'"):
 215                in_quotes = True
 216                quote_char = char
 217            elif char == '[':
 218                bracket_level += 1
 219            elif char == ']':
 220                bracket_level -= 1
 221            elif char == '{':
 222                brace_level += 1
 223            elif char == '}':
 224                brace_level -= 1
 225            elif char == ',' and bracket_level == 0 and brace_level == 0:
 226                items.append(current_item)
 227                current_item = ''
 228                i += 1
 229                continue
 230        
 231        current_item += char
 232        i += 1
 233        
 234    if current_item:
 235        items.append(current_item)
 236
 237    for param in items:
 238        param = param.strip()
 239        if not param:
 240            continue
 241
 242        _keys = param.split(":", maxsplit=1)
 243        if len(_keys) != 2:
 244            continue
 245
 246        keys = _keys[:-1]
 247        try:
 248            val = ast.literal_eval(_keys[-1])
 249        except Exception:
 250            val = str(_keys[-1])
 251
 252        c = params_dict
 253        for _k in keys[:-1]:
 254            try:
 255                k = ast.literal_eval(_k)
 256            except Exception:
 257                k = str(_k)
 258            if k not in c:
 259                c[k] = {}
 260            c = c[k]
 261
 262        c[keys[-1]] = val
 263
 264    return params_dict
 265
 266
 267def to_simple_dict(doc: Dict[str, Any]) -> str:
 268    """
 269    Serialize a document dictionary in simple-dict format.
 270    """
 271    import json
 272    import ast
 273    from meerschaum.utils.dtypes import json_serialize_value
 274
 275    def serialize_value(value):
 276        if isinstance(value, str):
 277            try:
 278                evaluated = ast.literal_eval(value)
 279                if not isinstance(evaluated, str):
 280                    return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
 281                return value
 282            except (ValueError, SyntaxError, TypeError, MemoryError):
 283                return value
 284        
 285        return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
 286
 287    return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items())
 288
 289
 290def parse_config_substitution(
 291    value: str,
 292    leading_key: str = 'MRSM',
 293    begin_key: str = '{',
 294    end_key: str = '}',
 295    delimeter: str = ':',
 296) -> List[Any]:
 297    """
 298    Parse Meerschaum substitution syntax
 299    E.g. MRSM{value1:value2} => ['value1', 'value2']
 300    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
 301    """
 302    if not value.beginswith(leading_key):
 303        return value
 304
 305    return leading_key[len(leading_key):][len():-1].split(delimeter)
 306
 307
 308def edit_file(
 309    path: Union[pathlib.Path, str],
 310    default_editor: str = 'pyvim',
 311    debug: bool = False
 312) -> bool:
 313    """
 314    Open a file for editing.
 315
 316    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
 317
 318    Parameters
 319    ----------
 320    path: Union[pathlib.Path, str]
 321        The path to the file to be edited.
 322
 323    default_editor: str, default 'pyvim'
 324        If `$EDITOR` is not set, use this instead.
 325        If `pyvim` is not installed, it will install it from PyPI.
 326
 327    debug: bool, default False
 328        Verbosity toggle.
 329
 330    Returns
 331    -------
 332    A bool indicating the file was successfully edited.
 333    """
 334    from subprocess import call
 335    from meerschaum.utils.debug import dprint
 336    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
 337    try:
 338        EDITOR = os.environ.get('EDITOR', default_editor)
 339        if debug:
 340            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
 341        rc = call([EDITOR, path])
 342    except Exception as e: ### can't open with default editors
 343        if debug:
 344            dprint(str(e))
 345            dprint('Failed to open file with system editor. Falling back to pyvim...')
 346        pyvim = attempt_import('pyvim', lazy=False)
 347        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
 348    return rc == 0
 349
 350
 351def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
 352    """
 353    Determine the columns and lines in the terminal.
 354    If they cannot be determined, return the default values (100 columns and 120 lines).
 355
 356    Parameters
 357    ----------
 358    default_cols: int, default 100
 359        If the columns cannot be determined, return this value.
 360
 361    default_lines: int, default 120
 362        If the lines cannot be determined, return this value.
 363
 364    Returns
 365    -------
 366    A tuple if integers for the columns and lines.
 367    """
 368    try:
 369        size = os.get_terminal_size()
 370        _cols, _lines = size.columns, size.lines
 371    except Exception:
 372        _cols, _lines = (
 373            int(os.environ.get('COLUMNS', str(default_cols))),
 374            int(os.environ.get('LINES', str(default_lines))),
 375        )
 376    return _cols, _lines
 377
 378
 379def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
 380    """
 381    Iterate over a list in chunks.
 382    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
 383
 384    Parameters
 385    ----------
 386    iterable: Iterable[Any]
 387        The iterable to iterate over in chunks.
 388        
 389    chunksize: int
 390        The size of chunks to iterate with.
 391        
 392    fillvalue: Optional[Any], default None
 393        If the chunks do not evenly divide into the iterable, pad the end with this value.
 394
 395    Returns
 396    -------
 397    A generator of tuples of size `chunksize`.
 398
 399    """
 400    from itertools import zip_longest
 401    args = [iter(iterable)] * chunksize
 402    return zip_longest(*args, fillvalue=fillvalue)
 403
 404def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
 405    """
 406    Sort a dictionary's values and return a new dictionary.
 407
 408    Parameters
 409    ----------
 410    d: Dict[Any, Any]
 411        The dictionary to be sorted.
 412
 413    Returns
 414    -------
 415    A sorted dictionary.
 416
 417    Examples
 418    --------
 419    >>> sorted_dict({'b': 1, 'a': 2})
 420    {'b': 1, 'a': 2}
 421    >>> sorted_dict({'b': 2, 'a': 1})
 422    {'a': 1, 'b': 2}
 423
 424    """
 425    try:
 426        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
 427    except Exception:
 428        return d
 429
 430
 431def timed_input(
 432    seconds: int = 10,
 433    timeout_message: str = "",
 434    prompt: str = "",
 435    icon: bool = False,
 436    **kw
 437) -> Union[str, None]:
 438    """
 439    Accept user input only for a brief period of time.
 440
 441    Parameters
 442    ----------
 443    seconds: int, default 10
 444        The number of seconds to wait.
 445
 446    timeout_message: str, default ''
 447        The message to print after the window has elapsed.
 448
 449    prompt: str, default ''
 450        The prompt to print during the window.
 451
 452    icon: bool, default False
 453        If `True`, print the configured input icon.
 454
 455
 456    Returns
 457    -------
 458    The input string entered by the user.
 459
 460    """
 461    import signal, time
 462
 463    class TimeoutExpired(Exception):
 464        """Raise this exception when the timeout is reached."""
 465
 466    def alarm_handler(signum, frame):
 467        raise TimeoutExpired
 468
 469    # set signal handler
 470    signal.signal(signal.SIGALRM, alarm_handler)
 471    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
 472
 473    try:
 474        return input(prompt)
 475    except TimeoutExpired:
 476        return None
 477    except (EOFError, RuntimeError):
 478        try:
 479            print(prompt)
 480            time.sleep(seconds)
 481        except TimeoutExpired:
 482            return None
 483    finally:
 484        signal.alarm(0) # cancel alarm
 485
 486
 487def enforce_gevent_monkey_patch():
 488    """
 489    Check if gevent monkey patching is enabled, and if not, then apply patching.
 490    """
 491    from meerschaum.utils.packages import attempt_import
 492    import socket
 493    gevent, gevent_socket, gevent_monkey = attempt_import(
 494        'gevent', 'gevent.socket', 'gevent.monkey'
 495    )
 496    if not socket.socket is gevent_socket.socket:
 497        gevent_monkey.patch_all()
 498
 499def is_valid_email(email: str) -> Union['re.Match', None]:
 500    """
 501    Check whether a string is a valid email.
 502
 503    Parameters
 504    ----------
 505    email: str
 506        The string to be examined.
 507        
 508    Returns
 509    -------
 510    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
 511
 512    Examples
 513    --------
 514    >>> is_valid_email('foo')
 515    >>> is_valid_email('foo@foo.com')
 516    <re.Match object; span=(0, 11), match='foo@foo.com'>
 517
 518    """
 519    import re
 520    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
 521    return re.search(regex, email)
 522
 523
 524def string_width(string: str, widest: bool = True) -> int:
 525    """
 526    Calculate the width of a string, either by its widest or last line.
 527
 528    Parameters
 529    ----------
 530    string: str:
 531        The string to be examined.
 532        
 533    widest: bool, default True
 534        No longer used because `widest` is always assumed to be true.
 535
 536    Returns
 537    -------
 538    An integer for the text's visual width.
 539
 540    Examples
 541    --------
 542    >>> string_width('a')
 543    1
 544    >>> string_width('a\\nbc\\nd')
 545    2
 546
 547    """
 548    def _widest():
 549        words = string.split('\n')
 550        max_length = 0
 551        for w in words:
 552            length = len(w)
 553            if length > max_length:
 554                max_length = length
 555        return max_length
 556
 557    return _widest()
 558
 559def _pyinstaller_traverse_dir(
 560    directory: str,
 561    ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
 562    include_dotfiles: bool = False
 563) -> list:
 564    """
 565    Recursively traverse a directory and return a list of its contents.
 566    """
 567    paths = []
 568    _directory = pathlib.Path(directory)
 569
 570    def _found_pattern(name: str):
 571        for pattern in ignore_patterns:
 572            if pattern.replace('/', os.path.sep) in str(name):
 573                return True
 574        return False
 575
 576    for root, dirs, files in os.walk(_directory):
 577        _root = str(root)[len(str(_directory.parent)):]
 578        if _root.startswith(os.path.sep):
 579            _root = _root[len(os.path.sep):]
 580        if _root.startswith('.') and not include_dotfiles:
 581            continue
 582        ### ignore certain patterns
 583        if _found_pattern(_root):
 584            continue
 585
 586        for filename in files:
 587            if filename.startswith('.') and not include_dotfiles:
 588                continue
 589            path = os.path.join(root, filename)
 590            if _found_pattern(path):
 591                continue
 592
 593            _path = str(path)[len(str(_directory.parent)):]
 594            if _path.startswith(os.path.sep):
 595                _path = _path[len(os.path.sep):]
 596            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
 597
 598            paths.append((path, _path))
 599    return paths
 600
 601
 602def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any:
 603    """
 604    Get a value from a dictionary with a tuple of keys.
 605
 606    Parameters
 607    ----------
 608    d: Dict[Any, Any]
 609        The dictionary to search.
 610
 611    path: Tuple[Any, ...]
 612        The path of keys to traverse.
 613
 614    Returns
 615    -------
 616    The value from the end of the path.
 617    """
 618    return functools.reduce(lambda di, key: di[key], path, d)
 619
 620
 621def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None:
 622    """
 623    Set a value in a dictionary with a tuple of keys.
 624
 625    Parameters
 626    ----------
 627    d: Dict[Any, Any]
 628        The dictionary to search.
 629
 630    path: Tuple[Any, ...]
 631        The path of keys to traverse.
 632
 633    val: Any
 634        The value to set at the end of the path.
 635    """
 636    get_val_from_dict_path(d, path[:-1])[path[-1]] = val
 637
 638
 639def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
 640    """
 641    Recursively replace passwords in a dictionary.
 642
 643    Parameters
 644    ----------
 645    d: Dict[str, Any]
 646        The dictionary to search through.
 647
 648    replace_with: str, default '*'
 649        The string to replace each character of the password with.
 650
 651    Returns
 652    -------
 653    Another dictionary where values to the keys `'password'`
 654    are replaced with `replace_with` (`'*'`).
 655
 656    Examples
 657    --------
 658    >>> replace_password({'a': 1})
 659    {'a': 1}
 660    >>> replace_password({'password': '123'})
 661    {'password': '***'}
 662    >>> replace_password({'nested': {'password': '123'}})
 663    {'nested': {'password': '***'}}
 664    >>> replace_password({'password': '123'}, replace_with='!')
 665    {'password': '!!!'}
 666
 667    """
 668    import copy
 669    _d = copy.deepcopy(d)
 670    for k, v in d.items():
 671        if isinstance(v, dict):
 672            _d[k] = replace_password(v)
 673        elif 'password' in str(k).lower():
 674            _d[k] = ''.join([replace_with for char in str(v)])
 675        elif str(k).lower() == 'uri':
 676            from meerschaum.connectors.sql import SQLConnector
 677            try:
 678                uri_params = SQLConnector.parse_uri(v)
 679            except Exception:
 680                uri_params = None
 681            if not uri_params:
 682                continue
 683            if not 'username' in uri_params or not 'password' in uri_params:
 684                continue
 685            _d[k] = v.replace(
 686                uri_params['username'] + ':' + uri_params['password'],
 687                uri_params['username'] + ':' + ''.join(
 688                    [replace_with for char in str(uri_params['password'])]
 689                )
 690            )
 691    return _d
 692
 693
 694def filter_arguments(
 695    func: Callable[[Any], Any],
 696    *args: Any,
 697    **kwargs: Any
 698) -> Tuple[Tuple[Any], Dict[str, Any]]:
 699    """
 700    Filter out unsupported positional and keyword arguments.
 701
 702    Parameters
 703    ----------
 704    func: Callable[[Any], Any]
 705        The function to inspect.
 706
 707    *args: Any
 708        Positional arguments to filter and pass to `func`.
 709
 710    **kwargs
 711        Keyword arguments to filter and pass to `func`.
 712
 713    Returns
 714    -------
 715    The `args` and `kwargs` accepted by `func`.
 716    """
 717    args = filter_positionals(func, *args)
 718    kwargs = filter_keywords(func, **kwargs)
 719    return args, kwargs
 720
 721
 722def filter_keywords(
 723    func: Callable[[Any], Any],
 724    **kw: Any
 725) -> Dict[str, Any]:
 726    """
 727    Filter out unsupported keyword arguments.
 728
 729    Parameters
 730    ----------
 731    func: Callable[[Any], Any]
 732        The function to inspect.
 733
 734    **kw: Any
 735        The arguments to be filtered and passed into `func`.
 736
 737    Returns
 738    -------
 739    A dictionary of keyword arguments accepted by `func`.
 740
 741    Examples
 742    --------
 743    ```python
 744    >>> def foo(a=1, b=2):
 745    ...     return a * b
 746    >>> filter_keywords(foo, a=2, b=4, c=6)
 747    {'a': 2, 'b': 4}
 748    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
 749    8
 750    ```
 751
 752    """
 753    import inspect
 754    func_params = inspect.signature(func).parameters
 755    ### If the function has a **kw method, skip filtering.
 756    for param, _type in func_params.items():
 757        if '**' in str(_type):
 758            return kw
 759    return {k: v for k, v in kw.items() if k in func_params}
 760
 761
 762def filter_positionals(
 763    func: Callable[[Any], Any],
 764    *args: Any
 765) -> Tuple[Any]:
 766    """
 767    Filter out unsupported positional arguments.
 768
 769    Parameters
 770    ----------
 771    func: Callable[[Any], Any]
 772        The function to inspect.
 773
 774    *args: Any
 775        The arguments to be filtered and passed into `func`.
 776        NOTE: If the function signature expects more arguments than provided,
 777        the missing slots will be filled with `None`.
 778
 779    Returns
 780    -------
 781    A tuple of positional arguments accepted by `func`.
 782
 783    Examples
 784    --------
 785    ```python
 786    >>> def foo(a, b):
 787    ...     return a * b
 788    >>> filter_positionals(foo, 2, 4, 6)
 789    (2, 4)
 790    >>> foo(*filter_positionals(foo, 2, 4, 6))
 791    8
 792    ```
 793
 794    """
 795    import inspect
 796    from meerschaum.utils.warnings import warn
 797    func_params = inspect.signature(func).parameters
 798    acceptable_args: List[Any] = []
 799
 800    def _warn_invalids(_num_invalid):
 801        if _num_invalid > 0:
 802            warn(
 803                "Too few arguments were provided. "
 804                + f"{_num_invalid} argument"
 805                + ('s have ' if _num_invalid != 1 else " has ")
 806                + " been filled with `None`.",
 807            )
 808
 809    num_invalid: int = 0
 810    for i, (param, val) in enumerate(func_params.items()):
 811        if '=' in str(val) or '*' in str(val):
 812            _warn_invalids(num_invalid)
 813            return tuple(acceptable_args)
 814
 815        try:
 816            acceptable_args.append(args[i])
 817        except IndexError:
 818            acceptable_args.append(None)
 819            num_invalid += 1
 820
 821    _warn_invalids(num_invalid)
 822    return tuple(acceptable_args)
 823
 824
 825def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
 826    """
 827    Convert an ordered dict to a dict.
 828    Does not mutate the original OrderedDict.
 829    """
 830    from collections import OrderedDict
 831    _d = dict(od)
 832    for k, v in od.items():
 833        if isinstance(v, OrderedDict) or (
 834            issubclass(type(v), OrderedDict)
 835        ):
 836            _d[k] = dict_from_od(v)
 837    return _d
 838
 839
 840def remove_ansi(s: str) -> str:
 841    """
 842    Remove ANSI escape characters from a string.
 843
 844    Parameters
 845    ----------
 846    s: str:
 847        The string to be cleaned.
 848
 849    Returns
 850    -------
 851    A string with the ANSI characters removed.
 852
 853    Examples
 854    --------
 855    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
 856    'Hello, World!'
 857
 858    """
 859    import re
 860    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
 861
 862
 863def get_connector_labels(
 864    *types: str,
 865    search_term: str = '',
 866    ignore_exact_match = True,
 867    _additional_options: Optional[List[str]] = None,
 868) -> List[str]:
 869    """
 870    Read connector labels from the configuration dictionary.
 871
 872    Parameters
 873    ----------
 874    *types: str
 875        The connector types.
 876        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
 877
 878    search_term: str, default ''
 879        A filter on the connectors' labels.
 880
 881    ignore_exact_match: bool, default True
 882        If `True`, skip a connector if the search_term is an exact match.
 883
 884    Returns
 885    -------
 886    A list of the keys of defined connectors.
 887
 888    """
 889    from meerschaum.config import get_config
 890    connectors = get_config('meerschaum', 'connectors')
 891
 892    _types = list(types)
 893    if len(_types) == 0:
 894        _types = list(connectors.keys()) + ['plugin']
 895
 896    conns = []
 897    for t in _types:
 898        if t == 'plugin':
 899            from meerschaum.plugins import get_data_plugins
 900            conns += [
 901                f'{t}:' + plugin.module.__name__.split('.')[-1]
 902                for plugin in get_data_plugins()
 903            ]
 904            continue
 905        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
 906
 907    if _additional_options:
 908        conns += _additional_options
 909
 910    possibilities = [
 911        c
 912        for c in conns
 913        if c.startswith(search_term)
 914            and c != (
 915                search_term if ignore_exact_match else ''
 916            )
 917    ]
 918    return sorted(possibilities)
 919
 920
 921def wget(
 922    url: str,
 923    dest: Optional[Union[str, 'pathlib.Path']] = None,
 924    headers: Optional[Dict[str, Any]] = None,
 925    color: bool = True,
 926    debug: bool = False,
 927    **kw: Any
 928) -> 'pathlib.Path':
 929    """
 930    Mimic `wget` with `requests`.
 931
 932    Parameters
 933    ----------
 934    url: str
 935        The URL to the resource to be downloaded.
 936
 937    dest: Optional[Union[str, pathlib.Path]], default None
 938        The destination path of the downloaded file.
 939        If `None`, save to the current directory.
 940
 941    color: bool, default True
 942        If `debug` is `True`, print color output.
 943
 944    debug: bool, default False
 945        Verbosity toggle.
 946
 947    Returns
 948    -------
 949    The path to the downloaded file.
 950
 951    """
 952    from meerschaum.utils.warnings import warn, error
 953    from meerschaum.utils.debug import dprint
 954    import re, urllib.request
 955    if headers is None:
 956        headers = {}
 957    request = urllib.request.Request(url, headers=headers)
 958    if not color:
 959        dprint = print
 960    if debug:
 961        dprint(f"Downloading from '{url}'...")
 962    try:
 963        response = urllib.request.urlopen(request)
 964    except Exception as e:
 965        import ssl
 966        ssl._create_default_https_context = ssl._create_unverified_context
 967        try:
 968            response = urllib.request.urlopen(request)
 969        except Exception as _e:
 970            print(_e)
 971            response = None
 972    if response is None or response.code != 200:
 973        error_msg = f"Failed to download from '{url}'."
 974        if color:
 975            error(error_msg)
 976        else:
 977            print(error_msg)
 978            import sys
 979            sys.exit(1)
 980
 981    d = response.headers.get('content-disposition', None)
 982    fname = (
 983        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
 984        else url.split('/')[-1]
 985    )
 986
 987    if dest is None:
 988        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
 989    elif isinstance(dest, str):
 990        dest = pathlib.Path(dest)
 991
 992    with open(dest, 'wb') as f:
 993        f.write(response.fp.read())
 994
 995    if debug:
 996        dprint(f"Downloaded file '{dest}'.")
 997
 998    return dest
 999
1000
1001def async_wrap(func):
1002    """
1003    Run a synchronous function as async.
1004    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1005    """
1006    import asyncio
1007    from functools import wraps, partial
1008
1009    @wraps(func)
1010    async def run(*args, loop=None, executor=None, **kwargs):
1011        if loop is None:
1012            loop = asyncio.get_event_loop()
1013        pfunc = partial(func, *args, **kwargs)
1014        return await loop.run_in_executor(executor, pfunc)
1015    return run
1016
1017
1018def debug_trace(browser: bool = True):
1019    """
1020    Open a web-based debugger to trace the execution of the program.
1021
1022    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1023    """
1024    from meerschaum.utils.debug import trace
1025    trace(browser=browser)
1026
1027
1028def items_str(
1029    items: List[Any],
1030    quotes: bool = True,
1031    quote_str: str = "'",
1032    commas: bool = True,
1033    comma_str: str = ',',
1034    and_: bool = True,
1035    and_str: str = 'and',
1036    oxford_comma: bool = True,
1037    spaces: bool = True,
1038    space_str = ' ',
1039) -> str:
1040    """
1041    Return a formatted string if list items separated by commas.
1042
1043    Parameters
1044    ----------
1045    items: [List[Any]]
1046        The items to be printed as an English list.
1047
1048    quotes: bool, default True
1049        If `True`, wrap items in quotes.
1050
1051    quote_str: str, default "'"
1052        If `quotes` is `True`, prepend and append each item with this string.
1053
1054    and_: bool, default True
1055        If `True`, include the word 'and' before the final item in the list.
1056
1057    and_str: str, default 'and'
1058        If `and_` is True, insert this string where 'and' normally would in and English list.
1059
1060    oxford_comma: bool, default True
1061        If `True`, include the Oxford Comma (comma before the final 'and').
1062        Only applies when `and_` is `True`.
1063
1064    spaces: bool, default True
1065        If `True`, separate items with `space_str`
1066
1067    space_str: str, default ' '
1068        If `spaces` is `True`, separate items with this string.
1069
1070    Returns
1071    -------
1072    A string of the items as an English list.
1073
1074    Examples
1075    --------
1076    >>> items_str([1,2,3])
1077    "'1', '2', and '3'"
1078    >>> items_str([1,2,3], quotes=False)
1079    '1, 2, and 3'
1080    >>> items_str([1,2,3], and_=False)
1081    "'1', '2', '3'"
1082    >>> items_str([1,2,3], spaces=False, and_=False)
1083    "'1','2','3'"
1084    >>> items_str([1,2,3], oxford_comma=False)
1085    "'1', '2' and '3'"
1086    >>> items_str([1,2,3], quote_str=":")
1087    ':1:, :2:, and :3:'
1088    >>> items_str([1,2,3], and_str="or")
1089    "'1', '2', or '3'"
1090    >>> items_str([1,2,3], space_str="_")
1091    "'1',_'2',_and_'3'"
1092
1093    """
1094    if not items:
1095        return ''
1096    
1097    q = quote_str if quotes else ''
1098    s = space_str if spaces else ''
1099    a = and_str if and_ else ''
1100    c = comma_str if commas else ''
1101
1102    if len(items) == 1:
1103        return q + str(list(items)[0]) + q
1104
1105    if len(items) == 2:
1106        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1107
1108    sep = q + c + s + q
1109    output = q + sep.join(str(i) for i in items[:-1]) + q
1110    if oxford_comma:
1111        output += c
1112    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1113    return output
1114
1115
1116def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str:
1117    """
1118    Return a human-readable string for a `timedelta` (or `int` minutes).
1119
1120    Parameters
1121    ----------
1122    delta: Union[timedelta, int]
1123        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1124
1125    round_unit: bool, default False
1126        If `True`, round the output to a single unit.
1127
1128    Returns
1129    -------
1130    A formatted string, fit for human eyes.
1131    """
1132    from meerschaum.utils.packages import attempt_import
1133    if is_int(str(delta)) and not round_unit:
1134        return str(delta)
1135
1136    humanfriendly = attempt_import('humanfriendly', lazy=False)
1137    delta_seconds = (
1138        delta.total_seconds()
1139        if hasattr(delta, 'total_seconds')
1140        else (delta * 60)
1141    )
1142
1143    is_negative = delta_seconds < 0
1144    delta_seconds = abs(delta_seconds)
1145    replace_units = {}
1146
1147    if round_unit:
1148        if delta_seconds < 1:
1149            delta_seconds = round(delta_seconds, 2)
1150        elif delta_seconds < 60:
1151            delta_seconds = int(delta_seconds)
1152        elif delta_seconds < 3600:
1153            delta_seconds = int(delta_seconds / 60) * 60
1154        elif delta_seconds < 86400:
1155            delta_seconds = int(delta_seconds / 3600) * 3600
1156        elif delta_seconds < (86400 * 7):
1157            delta_seconds = int(delta_seconds / 86400) * 86400
1158        elif delta_seconds < (86400 * 7 * 4):
1159            delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7)
1160        elif delta_seconds < (86400 * 7 * 4 * 13):
1161            delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7)
1162            replace_units['weeks'] = 'months'
1163        else:
1164            delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364)
1165
1166    delta_str = humanfriendly.format_timespan(delta_seconds)
1167    if ',' in delta_str and round_unit:
1168        delta_str = delta_str.split(',')[0]
1169    elif ' and ' in delta_str and round_unit:
1170        delta_str = delta_str.split(' and ')[0]
1171
1172    for parsed_unit, replacement_unit in replace_units.items():
1173        delta_str = delta_str.replace(parsed_unit, replacement_unit)
1174
1175    return delta_str + (' ago' if is_negative else '')
1176
1177
1178def is_docker_available() -> bool:
1179    """Check if we can connect to the Docker engine."""
1180    import subprocess
1181    try:
1182        has_docker = subprocess.call(
1183            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1184        ) == 0
1185    except Exception:
1186        has_docker = False
1187    return has_docker
1188
1189
1190def is_android() -> bool:
1191    """Return `True` if the current platform is Android."""
1192    import sys
1193    return hasattr(sys, 'getandroidapilevel')
1194
1195
1196def is_bcp_available() -> bool:
1197    """Check if the MSSQL `bcp` utility is installed."""
1198    import subprocess
1199
1200    try:
1201        has_bcp = subprocess.call(
1202            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1203        ) == 0
1204    except Exception:
1205        has_bcp = False
1206    return has_bcp
1207
1208
1209def is_systemd_available() -> bool:
1210    """Check if running on systemd."""
1211    return os.path.isdir('/run/systemd/system')
1212
1213
1214def is_tmux_available() -> bool:
1215    """
1216    Check if `tmux` is installed.
1217    """
1218    import subprocess
1219    try:
1220        has_tmux = subprocess.call(
1221            ['tmux', '-V'],
1222            stdout=subprocess.DEVNULL,
1223            stderr=subprocess.STDOUT
1224        ) == 0
1225    except FileNotFoundError:
1226        has_tmux = False
1227    except Exception:
1228        has_tmux = False
1229    return has_tmux
1230
1231def get_last_n_lines(file_name: str, N: int):
1232    """
1233    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1234    """
1235    # Create an empty list to keep the track of last N lines
1236    list_of_lines = []
1237    # Open file for reading in binary mode
1238    with open(file_name, 'rb') as read_obj:
1239        # Move the cursor to the end of the file
1240        read_obj.seek(0, os.SEEK_END)
1241        # Create a buffer to keep the last read line
1242        buffer = bytearray()
1243        # Get the current position of pointer i.e eof
1244        pointer_location = read_obj.tell()
1245        # Loop till pointer reaches the top of the file
1246        while pointer_location >= 0:
1247            # Move the file pointer to the location pointed by pointer_location
1248            read_obj.seek(pointer_location)
1249            # Shift pointer location by -1
1250            pointer_location = pointer_location -1
1251            # read that byte / character
1252            new_byte = read_obj.read(1)
1253            # If the read byte is new line character then it means one line is read
1254            if new_byte == b'\n':
1255                # Save the line in list of lines
1256                list_of_lines.append(buffer.decode()[::-1])
1257                # If the size of list reaches N, then return the reversed list
1258                if len(list_of_lines) == N:
1259                    return list(reversed(list_of_lines))
1260                # Reinitialize the byte array to save next line
1261                buffer = bytearray()
1262            else:
1263                # If last read character is not eol then add it in buffer
1264                buffer.extend(new_byte)
1265        # As file is read completely, if there is still data in buffer, then its first line.
1266        if len(buffer) > 0:
1267            list_of_lines.append(buffer.decode()[::-1])
1268    # return the reversed list
1269    return list(reversed(list_of_lines))
1270
1271
1272def tail(f, n, offset=None):
1273    """
1274    https://stackoverflow.com/a/692616/9699829
1275    
1276    Reads n lines from f with an offset of offset lines.  The return
1277    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1278    an indicator that is `True` if there are more lines in the file.
1279    """
1280    avg_line_length = 74
1281    to_read = n + (offset or 0)
1282
1283    while True:
1284        try:
1285            f.seek(-(avg_line_length * to_read), 2)
1286        except IOError:
1287            # woops.  apparently file is smaller than what we want
1288            # to step back, go to the beginning instead
1289            f.seek(0)
1290        pos = f.tell()
1291        lines = f.read().splitlines()
1292        if len(lines) >= to_read or pos == 0:
1293            return lines[-to_read:offset and -offset or None], \
1294                   len(lines) > to_read or pos > 0
1295        avg_line_length *= 1.3
1296
1297
1298def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1299    """
1300    Remove characters from each section of a string until the length is within the limit.
1301
1302    Parameters
1303    ----------
1304    item: str
1305        The item name to be truncated.
1306
1307    delimeter: str, default '_'
1308        Split `item` by this string into several sections.
1309
1310    max_len: int, default 128
1311        The max acceptable length of the truncated version of `item`.
1312
1313    Returns
1314    -------
1315    The truncated string.
1316
1317    Examples
1318    --------
1319    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1320    'ab_de_gh'
1321
1322    """
1323    if len(item) < max_len:
1324        return item
1325
1326    def _shorten(s: str) -> str:
1327        return s[:-1] if len(s) > 1 else s
1328
1329    sections = list(enumerate(item.split(delimeter)))
1330    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1331    available_chars = max_len - len(sections)
1332
1333    _sections = [(i, s) for i, s in sorted_sections]
1334    _sections_len = sum([len(s) for i, s in _sections])
1335    _old_sections_len = _sections_len
1336    while _sections_len > available_chars:
1337        _sections = [(i, _shorten(s)) for i, s in _sections]
1338        _old_sections_len = _sections_len
1339        _sections_len = sum([len(s) for i, s in _sections])
1340        if _old_sections_len == _sections_len:
1341            raise Exception(f"String could not be truncated: '{item}'")
1342
1343    new_sections = sorted(_sections, key=lambda x: x[0])
1344    return delimeter.join([s for i, s in new_sections])
1345
1346
1347def truncate_text_for_display(
1348    text: str,
1349    max_length: int = 50,
1350    suffix: str = '…',
1351) -> str:
1352    """
1353    Truncate a potentially long string for display purposes.
1354
1355    Parameters
1356    ----------
1357    text: str
1358        The string to be truncated.
1359
1360    max_length: int, default 60
1361        The maximum length of `text` before truncation.
1362
1363    suffix: str, default '…'
1364        The string to append to the length of `text` to indicate truncation.
1365
1366    Returns
1367    -------
1368    A string of length `max_length` or less.
1369    """
1370    text_length = len(text)
1371    if text_length <= max_length:
1372        return text
1373
1374    suffix_length = len(suffix)
1375
1376    truncated_text = text[:max_length - suffix_length]
1377    return truncated_text + suffix
1378
1379
1380def separate_negation_values(
1381    vals: Union[List[str], Tuple[str]],
1382    negation_prefix: Optional[str] = None,
1383) -> Tuple[List[str], List[str]]:
1384    """
1385    Separate the negated values from the positive ones.
1386    Return two lists: positive and negative values.
1387
1388    Parameters
1389    ----------
1390    vals: Union[List[str], Tuple[str]]
1391        A list of strings to parse.
1392
1393    negation_prefix: Optional[str], default None
1394        Include values that start with this string in the second list.
1395        If `None`, use the system default (`_`).
1396    """
1397    if negation_prefix is None:
1398        from meerschaum._internal.static import STATIC_CONFIG
1399        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1400    _in_vals, _ex_vals = [], []
1401    for v in vals:
1402        if str(v).startswith(negation_prefix):
1403            _ex_vals.append(str(v)[len(negation_prefix):])
1404        else:
1405            _in_vals.append(v)
1406
1407    return _in_vals, _ex_vals
1408
1409
1410def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1411    """
1412    Translate a params dictionary into lists of include- and exclude-values.
1413
1414    Parameters
1415    ----------
1416    params: Optional[Dict[str, Any]]
1417        A params query dictionary.
1418
1419    Returns
1420    -------
1421    A dictionary mapping keys to a tuple of lists for include and exclude values.
1422
1423    Examples
1424    --------
1425    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1426    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1427    """
1428    if not params:
1429        return {}
1430    return {
1431        col: separate_negation_values(
1432            (
1433                val
1434                if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype')
1435                else [val]
1436            )
1437        )
1438        for col, val in params.items()
1439    }
1440
1441
1442def flatten_list(list_: List[Any]) -> List[Any]:
1443    """
1444    Recursively flatten a list.
1445    """
1446    for item in list_:
1447        if isinstance(item, list):
1448            yield from flatten_list(item)
1449        else:
1450            yield item
1451
1452
1453def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1454    """
1455    Parse a string containing the text to be passed into a function
1456    and return a tuple of args, kwargs.
1457
1458    Parameters
1459    ----------
1460    args_str: str
1461        The contents of the function parameter (as a string).
1462
1463    Returns
1464    -------
1465    A tuple of args (tuple) and kwargs (dict[str, Any]).
1466
1467    Examples
1468    --------
1469    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1470    (123, 456), {'foo': 789, 'bar': 'baz'}
1471    """
1472    import ast
1473    args = []
1474    kwargs = {}
1475
1476    for part in args_str.split(','):
1477        if '=' in part:
1478            key, val = part.split('=', 1)
1479            kwargs[key.strip()] = ast.literal_eval(val)
1480        else:
1481            args.append(ast.literal_eval(part.strip()))
1482
1483    return tuple(args), kwargs
1484
1485
1486def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple:
1487    """
1488    Wrap around `pathlib.Path.symlink_to`, but add support for Windows.
1489
1490    Parameters
1491    ----------
1492    src_path: pathlib.Path
1493        The source path.
1494
1495    dest_path: pathlib.Path
1496        The destination path.
1497
1498    Returns
1499    -------
1500    A SuccessTuple indicating success.
1501    """
1502    if dest_path.exists() and dest_path.resolve() == src_path.resolve():
1503        return True, "Symlink already exists."
1504    try:
1505        dest_path.symlink_to(src_path)
1506        success = True
1507    except Exception as e:
1508        success = False
1509        msg = str(e)
1510    if success:
1511        return success, "Success"
1512
1513    ### Failed to create a symlink.
1514    ### If we're not on Windows, return an error.
1515    import platform
1516    if platform.system() != 'Windows':
1517        return success, msg
1518
1519    try:
1520        import _winapi
1521    except ImportError:
1522        return False, "Unable to import _winapi."
1523
1524    if src_path.is_dir():
1525        try:
1526            _winapi.CreateJunction(str(src_path), str(dest_path))
1527        except Exception as e:
1528            return False, str(e)
1529        return True, "Success"
1530
1531    ### Last resort: copy the file on Windows.
1532    import shutil
1533    try:
1534        shutil.copy(src_path, dest_path)
1535    except Exception as e:
1536        return False, str(e)
1537
1538    return True, "Success"
1539
1540
1541def is_symlink(path: pathlib.Path) -> bool:
1542    """
1543    Wrap `path.is_symlink()` but add support for Windows junctions.
1544    """
1545    if path.is_symlink():
1546        return True
1547
1548    import platform
1549    if platform.system() != 'Windows':
1550        return False
1551    try:
1552        return bool(os.readlink(path))
1553    except OSError:
1554        return False
1555
1556
1557def parametrized(dec):
1558    """
1559    A meta-decorator for allowing other decorator functions to have parameters.
1560
1561    https://stackoverflow.com/a/26151604/9699829
1562    """
1563    def layer(*args, **kwargs):
1564        def repl(f):
1565            return dec(f, *args, **kwargs)
1566        return repl
1567    return layer
1568
1569
1570def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1571    """
1572    Safely extract a TAR file to a give directory.
1573    This defends against CVE-2007-4559.
1574
1575    Parameters
1576    ----------
1577    tarf: file
1578        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1579
1580    output_dir: Union[str, pathlib.Path]
1581        The output directory.
1582    """
1583
1584    def is_within_directory(directory, target):
1585        abs_directory = os.path.abspath(directory)
1586        abs_target = os.path.abspath(target)
1587        prefix = os.path.commonprefix([abs_directory, abs_target])
1588        return prefix == abs_directory 
1589
1590    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1591        for member in tar.getmembers():
1592            member_path = os.path.join(path, member.name)
1593            if not is_within_directory(path, member_path):
1594                raise Exception("Attempted Path Traversal in Tar File")
1595
1596        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1597
1598    return safe_extract(tarf, output_dir)
1599
1600
1601def to_snake_case(name: str) -> str:
1602    """
1603    Return the given string in snake-case-style.
1604
1605    Parameters
1606    ----------
1607    name: str
1608        The input text to convert to snake case.
1609
1610    Returns
1611    -------
1612    A snake-case version of `name`.
1613
1614    Examples
1615    --------
1616    >>> to_snake_case("HelloWorld!")
1617    'hello_world'
1618    >>> to_snake_case("This has spaces in it.")
1619    'this_has_spaces_in_it'
1620    >>> to_snake_case("already_in_snake_case")
1621    'already_in_snake_case'
1622    """
1623    import re
1624    name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
1625    name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
1626    name = re.sub(r'[^\w\s]', '', name)
1627    name = re.sub(r'\s+', '_', name)
1628    return name.lower()
1629
1630
1631def get_directory_size(path: Path) -> int:
1632    """
1633    Return the cumulative size of a directory's files in bytes.
1634    https://stackoverflow.com/a/55659577/9699829
1635    """
1636    return sum(file.stat().st_size for file in path.rglob('*'))
1637
1638
1639##################
1640# Legacy imports #
1641##################
1642
1643def choose_subaction(*args, **kwargs) -> Any:
1644    """
1645    Placeholder function to prevent breaking legacy behavior.
1646    See `meerschaum.actions.choose_subaction`.
1647    """
1648    from meerschaum.actions import choose_subaction as _choose_subactions
1649    return _choose_subactions(*args, **kwargs)
1650
1651
1652def print_options(*args, **kwargs) -> None:
1653    """
1654    Placeholder function to prevent breaking legacy behavior.
1655    See `meerschaum.utils.formatting.print_options`.
1656    """
1657    from meerschaum.utils.formatting import print_options as _print_options
1658    return _print_options(*args, **kwargs)
1659
1660
1661def to_pandas_dtype(*args, **kwargs) -> Any:
1662    """
1663    Placeholder function to prevent breaking legacy behavior.
1664    See `meerschaum.utils.dtypes.to_pandas_dtype`.
1665    """
1666    from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype
1667    return _to_pandas_dtype(*args, **kwargs)
1668
1669
1670def filter_unseen_df(*args, **kwargs) -> Any:
1671    """
1672    Placeholder function to prevent breaking legacy behavior.
1673    See `meerschaum.utils.dataframe.filter_unseen_df`.
1674    """
1675    from meerschaum.utils.dataframe import filter_unseen_df as real_function
1676    return real_function(*args, **kwargs)
1677
1678
1679def add_missing_cols_to_df(*args, **kwargs) -> Any:
1680    """
1681    Placeholder function to prevent breaking legacy behavior.
1682    See `meerschaum.utils.dataframe.add_missing_cols_to_df`.
1683    """
1684    from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function
1685    return real_function(*args, **kwargs)
1686
1687
1688def parse_df_datetimes(*args, **kwargs) -> Any:
1689    """
1690    Placeholder function to prevent breaking legacy behavior.
1691    See `meerschaum.utils.dataframe.parse_df_datetimes`.
1692    """
1693    from meerschaum.utils.dataframe import parse_df_datetimes as real_function
1694    return real_function(*args, **kwargs)
1695
1696
1697def df_from_literal(*args, **kwargs) -> Any:
1698    """
1699    Placeholder function to prevent breaking legacy behavior.
1700    See `meerschaum.utils.dataframe.df_from_literal`.
1701    """
1702    from meerschaum.utils.dataframe import df_from_literal as real_function
1703    return real_function(*args, **kwargs)
1704
1705
1706def get_json_cols(*args, **kwargs) -> Any:
1707    """
1708    Placeholder function to prevent breaking legacy behavior.
1709    See `meerschaum.utils.dataframe.get_json_cols`.
1710    """
1711    from meerschaum.utils.dataframe import get_json_cols as real_function
1712    return real_function(*args, **kwargs)
1713
1714
1715def get_unhashable_cols(*args, **kwargs) -> Any:
1716    """
1717    Placeholder function to prevent breaking legacy behavior.
1718    See `meerschaum.utils.dataframe.get_unhashable_cols`.
1719    """
1720    from meerschaum.utils.dataframe import get_unhashable_cols as real_function
1721    return real_function(*args, **kwargs)
1722
1723
1724def enforce_dtypes(*args, **kwargs) -> Any:
1725    """
1726    Placeholder function to prevent breaking legacy behavior.
1727    See `meerschaum.utils.dataframe.enforce_dtypes`.
1728    """
1729    from meerschaum.utils.dataframe import enforce_dtypes as real_function
1730    return real_function(*args, **kwargs)
1731
1732
1733def get_datetime_bound_from_df(*args, **kwargs) -> Any:
1734    """
1735    Placeholder function to prevent breaking legacy behavior.
1736    See `meerschaum.utils.dataframe.get_datetime_bound_from_df`.
1737    """
1738    from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function
1739    return real_function(*args, **kwargs)
1740
1741
1742def df_is_chunk_generator(*args, **kwargs) -> Any:
1743    """
1744    Placeholder function to prevent breaking legacy behavior.
1745    See `meerschaum.utils.dataframe.df_is_chunk_generator`.
1746    """
1747    from meerschaum.utils.dataframe import df_is_chunk_generator as real_function
1748    return real_function(*args, **kwargs)
1749
1750
1751def choices_docstring(*args, **kwargs) -> Any:
1752    """
1753    Placeholder function to prevent breaking legacy behavior.
1754    See `meerschaum.actions.choices_docstring`.
1755    """
1756    from meerschaum.actions import choices_docstring as real_function
1757    return real_function(*args, **kwargs)
1758
1759
1760def _get_subaction_names(*args, **kwargs) -> Any:
1761    """
1762    Placeholder function to prevent breaking legacy behavior.
1763    See `meerschaum.actions._get_subaction_names`.
1764    """
1765    from meerschaum.actions import _get_subaction_names as real_function
1766    return real_function(*args, **kwargs)
1767
1768
1769def json_serialize_datetime(dt: datetime) -> Union[str, None]:
1770    """
1771    Serialize a datetime object into JSON (ISO format string).
1772
1773    Examples
1774    --------
1775    >>> import json
1776    >>> from datetime import datetime
1777    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
1778    '{"a": "2022-01-01T00:00:00Z"}'
1779
1780    """
1781    from meerschaum.utils.dtypes import serialize_datetime
1782    return serialize_datetime(dt)
1783
1784
1785def replace_pipes_in_dict(*args, **kwargs):
1786    """
1787    Placeholder function to prevent breaking legacy behavior.
1788    See `meerschaum.utils.pipes.replace_pipes_in_dict`.
1789    """
1790    from meerschaum.utils.pipes import replace_pipes_in_dict
1791    return replace_pipes_in_dict(*args, **kwargs)
1792
1793
1794def is_pipe_registered(*args, **kwargs):
1795    """
1796    Placeholder function to prevent breaking legacy behavior.
1797    See `meerschaum.utils.pipes.is_pipe_registered`.
1798    """
1799    from meerschaum.utils.pipes import is_pipe_registered
1800    return is_pipe_registered(*args, **kwargs)
1801
1802
1803def round_time(*args, **kwargs):
1804    """
1805    Placeholder function to prevent breaking legacy behavior.
1806    See `meerschaum.utils.dtypes.round_time`.
1807    """
1808    from meerschaum.utils.dtypes import round_time
1809    return round_time(*args, **kwargs)
1810
1811
1812def flatten_pipes_dict(*args, **kwargs):
1813    """
1814    Placeholder function to prevent breaking legacy behavior.
1815    See `meerschaum.utils.pipes.flatten_pipes_dict`.
1816    """
1817    from meerschaum.utils.pipes import flatten_pipes_dict
1818    return flatten_pipes_dict(*args, **kwargs)
1819
1820
1821_current_module = sys.modules[__name__]
1822__all__ = tuple(
1823    name
1824    for name, obj in globals().items()
1825    if callable(obj)
1826        and name not in __pdoc__
1827        and getattr(obj, '__module__', None) == _current_module.__name__
1828        and not name.startswith('_')
1829)
def add_method_to_class( func: Callable[[Any], Any], class_def: "'Class'", method_name: Optional[str] = None, keep_self: Optional[bool] = None) -> Callable[[Any], Any]:
52def add_method_to_class(
53    func: Callable[[Any], Any],
54    class_def: 'Class',
55    method_name: Optional[str] = None,
56    keep_self: Optional[bool] = None,
57) -> Callable[[Any], Any]:
58    """
59    Add function `func` to class `class_def`.
60
61    Parameters
62    ----------
63    func: Callable[[Any], Any]
64        Function to be added as a method of the class
65
66    class_def: Class
67        Class to be modified.
68
69    method_name: Optional[str], default None
70        New name of the method. None will use func.__name__ (default).
71
72    Returns
73    -------
74    The modified function object.
75
76    """
77    from functools import wraps
78
79    is_class = isinstance(class_def, type)
80    
81    @wraps(func)
82    def wrapper(self, *args, **kw):
83        return func(*args, **kw)
84
85    if method_name is None:
86        method_name = func.__name__
87
88    setattr(class_def, method_name, (
89            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
90        )
91    )
92
93    return func

Add function func to class class_def.

Parameters
  • func (Callable[[Any], Any]): Function to be added as a method of the class
  • class_def (Class): Class to be modified.
  • method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
  • The modified function object.
def generate_password(length: int = 12) -> str:
 96def generate_password(length: int = 12) -> str:
 97    """
 98    Generate a secure password of given length.
 99
100    Parameters
101    ----------
102    length: int, default 12
103        The length of the password.
104
105    Returns
106    -------
107    A random password string.
108    """
109    import secrets
110    import string
111    return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))

Generate a secure password of given length.

Parameters
  • length (int, default 12): The length of the password.
Returns
  • A random password string.
def is_int(s: str) -> bool:
114def is_int(s: str) -> bool:
115    """
116    Check if string is an int.
117
118    Parameters
119    ----------
120    s: str
121        The string to be checked.
122        
123    Returns
124    -------
125    A bool indicating whether the string was able to be cast to an integer.
126
127    """
128    try:
129        return float(s).is_integer()
130    except Exception:
131        return False

Check if string is an int.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string was able to be cast to an integer.
def is_uuid(s: str) -> bool:
134def is_uuid(s: str) -> bool:
135    """
136    Check if a string is a valid UUID.
137
138    Parameters
139    ----------
140    s: str
141        The string to be checked.
142
143    Returns
144    -------
145    A bool indicating whether the string is a valid UUID.
146    """
147    import uuid
148    try:
149        uuid.UUID(str(s))
150        return True
151    except Exception:
152        return False

Check if a string is a valid UUID.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string is a valid UUID.
def string_to_dict(params_string: str) -> Dict[str, Any]:
155def string_to_dict(params_string: str) -> Dict[str, Any]:
156    """
157    Parse a string into a dictionary.
158    
159    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
160
161    Parameters
162    ----------
163    params_string: str
164        The string to be parsed.
165        
166    Returns
167    -------
168    The parsed dictionary.
169
170    Examples
171    --------
172    >>> string_to_dict("a:1,b:2") 
173    {'a': 1, 'b': 2}
174    >>> string_to_dict('{"a": 1, "b": 2}')
175    {'a': 1, 'b': 2}
176
177    """
178    if not params_string:
179        return {}
180
181    import json
182
183    ### Kind of a weird edge case.
184    ### In the generated compose file, there is some weird escaping happening,
185    ### so the string to be parsed starts and ends with a single quote.
186    if (
187        isinstance(params_string, str)
188        and len(params_string) > 4
189        and params_string[1] == "{"
190        and params_string[-2] == "}"
191    ):
192        return json.loads(params_string[1:-1])
193
194    if str(params_string).startswith('{'):
195        return json.loads(params_string)
196
197    import ast
198    params_dict = {}
199    
200    items = []
201    bracket_level = 0
202    brace_level = 0
203    current_item = ''
204    in_quotes = False
205    quote_char = ''
206    
207    i = 0
208    while i < len(params_string):
209        char = params_string[i]
210        
211        if in_quotes:
212            if char == quote_char and (i == 0 or params_string[i-1] != '\\'):
213                in_quotes = False
214        else:
215            if char in ('"', "'"):
216                in_quotes = True
217                quote_char = char
218            elif char == '[':
219                bracket_level += 1
220            elif char == ']':
221                bracket_level -= 1
222            elif char == '{':
223                brace_level += 1
224            elif char == '}':
225                brace_level -= 1
226            elif char == ',' and bracket_level == 0 and brace_level == 0:
227                items.append(current_item)
228                current_item = ''
229                i += 1
230                continue
231        
232        current_item += char
233        i += 1
234        
235    if current_item:
236        items.append(current_item)
237
238    for param in items:
239        param = param.strip()
240        if not param:
241            continue
242
243        _keys = param.split(":", maxsplit=1)
244        if len(_keys) != 2:
245            continue
246
247        keys = _keys[:-1]
248        try:
249            val = ast.literal_eval(_keys[-1])
250        except Exception:
251            val = str(_keys[-1])
252
253        c = params_dict
254        for _k in keys[:-1]:
255            try:
256                k = ast.literal_eval(_k)
257            except Exception:
258                k = str(_k)
259            if k not in c:
260                c[k] = {}
261            c = c[k]
262
263        c[keys[-1]] = val
264
265    return params_dict

Parse a string into a dictionary.

If the string begins with '{', parse as JSON. Otherwise use simple parsing.

Parameters
  • params_string (str): The string to be parsed.
Returns
  • The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2") 
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
def to_simple_dict(doc: Dict[str, Any]) -> str:
268def to_simple_dict(doc: Dict[str, Any]) -> str:
269    """
270    Serialize a document dictionary in simple-dict format.
271    """
272    import json
273    import ast
274    from meerschaum.utils.dtypes import json_serialize_value
275
276    def serialize_value(value):
277        if isinstance(value, str):
278            try:
279                evaluated = ast.literal_eval(value)
280                if not isinstance(evaluated, str):
281                    return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
282                return value
283            except (ValueError, SyntaxError, TypeError, MemoryError):
284                return value
285        
286        return json.dumps(value, separators=(',', ':'), default=json_serialize_value)
287
288    return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items())

Serialize a document dictionary in simple-dict format.

def parse_config_substitution( value: str, leading_key: str = 'MRSM', begin_key: str = '{', end_key: str = '}', delimeter: str = ':') -> List[Any]:
291def parse_config_substitution(
292    value: str,
293    leading_key: str = 'MRSM',
294    begin_key: str = '{',
295    end_key: str = '}',
296    delimeter: str = ':',
297) -> List[Any]:
298    """
299    Parse Meerschaum substitution syntax
300    E.g. MRSM{value1:value2} => ['value1', 'value2']
301    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
302    """
303    if not value.beginswith(leading_key):
304        return value
305
306    return leading_key[len(leading_key):][len():-1].split(delimeter)

Parse Meerschaum substitution syntax E.g. MRSM{value1:value2} => ['value1', 'value2'] NOTE: Not currently used. See search_and_substitute_config in meerschaum.config._read_yaml.

def edit_file( path: Union[pathlib.Path, str], default_editor: str = 'pyvim', debug: bool = False) -> bool:
309def edit_file(
310    path: Union[pathlib.Path, str],
311    default_editor: str = 'pyvim',
312    debug: bool = False
313) -> bool:
314    """
315    Open a file for editing.
316
317    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
318
319    Parameters
320    ----------
321    path: Union[pathlib.Path, str]
322        The path to the file to be edited.
323
324    default_editor: str, default 'pyvim'
325        If `$EDITOR` is not set, use this instead.
326        If `pyvim` is not installed, it will install it from PyPI.
327
328    debug: bool, default False
329        Verbosity toggle.
330
331    Returns
332    -------
333    A bool indicating the file was successfully edited.
334    """
335    from subprocess import call
336    from meerschaum.utils.debug import dprint
337    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
338    try:
339        EDITOR = os.environ.get('EDITOR', default_editor)
340        if debug:
341            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
342        rc = call([EDITOR, path])
343    except Exception as e: ### can't open with default editors
344        if debug:
345            dprint(str(e))
346            dprint('Failed to open file with system editor. Falling back to pyvim...')
347        pyvim = attempt_import('pyvim', lazy=False)
348        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
349    return rc == 0

Open a file for editing.

Attempt to launch the user's defined $EDITOR, otherwise use pyvim.

Parameters
  • path (Union[pathlib.Path, str]): The path to the file to be edited.
  • default_editor (str, default 'pyvim'): If $EDITOR is not set, use this instead. If pyvim is not installed, it will install it from PyPI.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating the file was successfully edited.
def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
352def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
353    """
354    Determine the columns and lines in the terminal.
355    If they cannot be determined, return the default values (100 columns and 120 lines).
356
357    Parameters
358    ----------
359    default_cols: int, default 100
360        If the columns cannot be determined, return this value.
361
362    default_lines: int, default 120
363        If the lines cannot be determined, return this value.
364
365    Returns
366    -------
367    A tuple if integers for the columns and lines.
368    """
369    try:
370        size = os.get_terminal_size()
371        _cols, _lines = size.columns, size.lines
372    except Exception:
373        _cols, _lines = (
374            int(os.environ.get('COLUMNS', str(default_cols))),
375            int(os.environ.get('LINES', str(default_lines))),
376        )
377    return _cols, _lines

Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).

Parameters
  • default_cols (int, default 100): If the columns cannot be determined, return this value.
  • default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
  • A tuple if integers for the columns and lines.
def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
380def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
381    """
382    Iterate over a list in chunks.
383    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
384
385    Parameters
386    ----------
387    iterable: Iterable[Any]
388        The iterable to iterate over in chunks.
389        
390    chunksize: int
391        The size of chunks to iterate with.
392        
393    fillvalue: Optional[Any], default None
394        If the chunks do not evenly divide into the iterable, pad the end with this value.
395
396    Returns
397    -------
398    A generator of tuples of size `chunksize`.
399
400    """
401    from itertools import zip_longest
402    args = [iter(iterable)] * chunksize
403    return zip_longest(*args, fillvalue=fillvalue)

Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks

Parameters
  • iterable (Iterable[Any]): The iterable to iterate over in chunks.
  • chunksize (int): The size of chunks to iterate with.
  • fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
  • A generator of tuples of size chunksize.
def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
405def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
406    """
407    Sort a dictionary's values and return a new dictionary.
408
409    Parameters
410    ----------
411    d: Dict[Any, Any]
412        The dictionary to be sorted.
413
414    Returns
415    -------
416    A sorted dictionary.
417
418    Examples
419    --------
420    >>> sorted_dict({'b': 1, 'a': 2})
421    {'b': 1, 'a': 2}
422    >>> sorted_dict({'b': 2, 'a': 1})
423    {'a': 1, 'b': 2}
424
425    """
426    try:
427        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
428    except Exception:
429        return d

Sort a dictionary's values and return a new dictionary.

Parameters
  • d (Dict[Any, Any]): The dictionary to be sorted.
Returns
  • A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
def timed_input( seconds: int = 10, timeout_message: str = '', prompt: str = '', icon: bool = False, **kw) -> Optional[str]:
432def timed_input(
433    seconds: int = 10,
434    timeout_message: str = "",
435    prompt: str = "",
436    icon: bool = False,
437    **kw
438) -> Union[str, None]:
439    """
440    Accept user input only for a brief period of time.
441
442    Parameters
443    ----------
444    seconds: int, default 10
445        The number of seconds to wait.
446
447    timeout_message: str, default ''
448        The message to print after the window has elapsed.
449
450    prompt: str, default ''
451        The prompt to print during the window.
452
453    icon: bool, default False
454        If `True`, print the configured input icon.
455
456
457    Returns
458    -------
459    The input string entered by the user.
460
461    """
462    import signal, time
463
464    class TimeoutExpired(Exception):
465        """Raise this exception when the timeout is reached."""
466
467    def alarm_handler(signum, frame):
468        raise TimeoutExpired
469
470    # set signal handler
471    signal.signal(signal.SIGALRM, alarm_handler)
472    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
473
474    try:
475        return input(prompt)
476    except TimeoutExpired:
477        return None
478    except (EOFError, RuntimeError):
479        try:
480            print(prompt)
481            time.sleep(seconds)
482        except TimeoutExpired:
483            return None
484    finally:
485        signal.alarm(0) # cancel alarm

Accept user input only for a brief period of time.

Parameters
  • seconds (int, default 10): The number of seconds to wait.
  • timeout_message (str, default ''): The message to print after the window has elapsed.
  • prompt (str, default ''): The prompt to print during the window.
  • icon (bool, default False): If True, print the configured input icon.
Returns
  • The input string entered by the user.
def enforce_gevent_monkey_patch():
488def enforce_gevent_monkey_patch():
489    """
490    Check if gevent monkey patching is enabled, and if not, then apply patching.
491    """
492    from meerschaum.utils.packages import attempt_import
493    import socket
494    gevent, gevent_socket, gevent_monkey = attempt_import(
495        'gevent', 'gevent.socket', 'gevent.monkey'
496    )
497    if not socket.socket is gevent_socket.socket:
498        gevent_monkey.patch_all()

Check if gevent monkey patching is enabled, and if not, then apply patching.

def is_valid_email(email: str) -> Optional[re.Match]:
500def is_valid_email(email: str) -> Union['re.Match', None]:
501    """
502    Check whether a string is a valid email.
503
504    Parameters
505    ----------
506    email: str
507        The string to be examined.
508        
509    Returns
510    -------
511    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
512
513    Examples
514    --------
515    >>> is_valid_email('foo')
516    >>> is_valid_email('foo@foo.com')
517    <re.Match object; span=(0, 11), match='foo@foo.com'>
518
519    """
520    import re
521    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
522    return re.search(regex, email)

Check whether a string is a valid email.

Parameters
  • email (str): The string to be examined.
Returns
  • None if a string is not in email format, otherwise a re.Match object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
def string_width(string: str, widest: bool = True) -> int:
525def string_width(string: str, widest: bool = True) -> int:
526    """
527    Calculate the width of a string, either by its widest or last line.
528
529    Parameters
530    ----------
531    string: str:
532        The string to be examined.
533        
534    widest: bool, default True
535        No longer used because `widest` is always assumed to be true.
536
537    Returns
538    -------
539    An integer for the text's visual width.
540
541    Examples
542    --------
543    >>> string_width('a')
544    1
545    >>> string_width('a\\nbc\\nd')
546    2
547
548    """
549    def _widest():
550        words = string.split('\n')
551        max_length = 0
552        for w in words:
553            length = len(w)
554            if length > max_length:
555                max_length = length
556        return max_length
557
558    return _widest()

Calculate the width of a string, either by its widest or last line.

Parameters
  • string (str:): The string to be examined.
  • widest (bool, default True): No longer used because widest is always assumed to be true.
Returns
  • An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any:
603def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any:
604    """
605    Get a value from a dictionary with a tuple of keys.
606
607    Parameters
608    ----------
609    d: Dict[Any, Any]
610        The dictionary to search.
611
612    path: Tuple[Any, ...]
613        The path of keys to traverse.
614
615    Returns
616    -------
617    The value from the end of the path.
618    """
619    return functools.reduce(lambda di, key: di[key], path, d)

Get a value from a dictionary with a tuple of keys.

Parameters
  • d (Dict[Any, Any]): The dictionary to search.
  • path (Tuple[Any, ...]): The path of keys to traverse.
Returns
  • The value from the end of the path.
def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None:
622def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None:
623    """
624    Set a value in a dictionary with a tuple of keys.
625
626    Parameters
627    ----------
628    d: Dict[Any, Any]
629        The dictionary to search.
630
631    path: Tuple[Any, ...]
632        The path of keys to traverse.
633
634    val: Any
635        The value to set at the end of the path.
636    """
637    get_val_from_dict_path(d, path[:-1])[path[-1]] = val

Set a value in a dictionary with a tuple of keys.

Parameters
  • d (Dict[Any, Any]): The dictionary to search.
  • path (Tuple[Any, ...]): The path of keys to traverse.
  • val (Any): The value to set at the end of the path.
def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
640def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
641    """
642    Recursively replace passwords in a dictionary.
643
644    Parameters
645    ----------
646    d: Dict[str, Any]
647        The dictionary to search through.
648
649    replace_with: str, default '*'
650        The string to replace each character of the password with.
651
652    Returns
653    -------
654    Another dictionary where values to the keys `'password'`
655    are replaced with `replace_with` (`'*'`).
656
657    Examples
658    --------
659    >>> replace_password({'a': 1})
660    {'a': 1}
661    >>> replace_password({'password': '123'})
662    {'password': '***'}
663    >>> replace_password({'nested': {'password': '123'}})
664    {'nested': {'password': '***'}}
665    >>> replace_password({'password': '123'}, replace_with='!')
666    {'password': '!!!'}
667
668    """
669    import copy
670    _d = copy.deepcopy(d)
671    for k, v in d.items():
672        if isinstance(v, dict):
673            _d[k] = replace_password(v)
674        elif 'password' in str(k).lower():
675            _d[k] = ''.join([replace_with for char in str(v)])
676        elif str(k).lower() == 'uri':
677            from meerschaum.connectors.sql import SQLConnector
678            try:
679                uri_params = SQLConnector.parse_uri(v)
680            except Exception:
681                uri_params = None
682            if not uri_params:
683                continue
684            if not 'username' in uri_params or not 'password' in uri_params:
685                continue
686            _d[k] = v.replace(
687                uri_params['username'] + ':' + uri_params['password'],
688                uri_params['username'] + ':' + ''.join(
689                    [replace_with for char in str(uri_params['password'])]
690                )
691            )
692    return _d

Recursively replace passwords in a dictionary.

Parameters
  • d (Dict[str, Any]): The dictionary to search through.
  • replace_with (str, default '*'): The string to replace each character of the password with.
Returns
  • Another dictionary where values to the keys 'password'
  • are replaced with replace_with ('*').
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
def filter_arguments( func: Callable[[Any], Any], *args: Any, **kwargs: Any) -> Tuple[Tuple[Any], Dict[str, Any]]:
695def filter_arguments(
696    func: Callable[[Any], Any],
697    *args: Any,
698    **kwargs: Any
699) -> Tuple[Tuple[Any], Dict[str, Any]]:
700    """
701    Filter out unsupported positional and keyword arguments.
702
703    Parameters
704    ----------
705    func: Callable[[Any], Any]
706        The function to inspect.
707
708    *args: Any
709        Positional arguments to filter and pass to `func`.
710
711    **kwargs
712        Keyword arguments to filter and pass to `func`.
713
714    Returns
715    -------
716    The `args` and `kwargs` accepted by `func`.
717    """
718    args = filter_positionals(func, *args)
719    kwargs = filter_keywords(func, **kwargs)
720    return args, kwargs

Filter out unsupported positional and keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): Positional arguments to filter and pass to func.
  • **kwargs: Keyword arguments to filter and pass to func.
Returns
  • The args and kwargs accepted by func.
def filter_keywords(func: Callable[[Any], Any], **kw: Any) -> Dict[str, Any]:
723def filter_keywords(
724    func: Callable[[Any], Any],
725    **kw: Any
726) -> Dict[str, Any]:
727    """
728    Filter out unsupported keyword arguments.
729
730    Parameters
731    ----------
732    func: Callable[[Any], Any]
733        The function to inspect.
734
735    **kw: Any
736        The arguments to be filtered and passed into `func`.
737
738    Returns
739    -------
740    A dictionary of keyword arguments accepted by `func`.
741
742    Examples
743    --------
744    ```python
745    >>> def foo(a=1, b=2):
746    ...     return a * b
747    >>> filter_keywords(foo, a=2, b=4, c=6)
748    {'a': 2, 'b': 4}
749    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
750    8
751    ```
752
753    """
754    import inspect
755    func_params = inspect.signature(func).parameters
756    ### If the function has a **kw method, skip filtering.
757    for param, _type in func_params.items():
758        if '**' in str(_type):
759            return kw
760    return {k: v for k, v in kw.items() if k in func_params}

Filter out unsupported keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • **kw (Any): The arguments to be filtered and passed into func.
Returns
  • A dictionary of keyword arguments accepted by func.
Examples
>>> def foo(a=1, b=2):
...     return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
def filter_positionals(func: Callable[[Any], Any], *args: Any) -> Tuple[Any]:
763def filter_positionals(
764    func: Callable[[Any], Any],
765    *args: Any
766) -> Tuple[Any]:
767    """
768    Filter out unsupported positional arguments.
769
770    Parameters
771    ----------
772    func: Callable[[Any], Any]
773        The function to inspect.
774
775    *args: Any
776        The arguments to be filtered and passed into `func`.
777        NOTE: If the function signature expects more arguments than provided,
778        the missing slots will be filled with `None`.
779
780    Returns
781    -------
782    A tuple of positional arguments accepted by `func`.
783
784    Examples
785    --------
786    ```python
787    >>> def foo(a, b):
788    ...     return a * b
789    >>> filter_positionals(foo, 2, 4, 6)
790    (2, 4)
791    >>> foo(*filter_positionals(foo, 2, 4, 6))
792    8
793    ```
794
795    """
796    import inspect
797    from meerschaum.utils.warnings import warn
798    func_params = inspect.signature(func).parameters
799    acceptable_args: List[Any] = []
800
801    def _warn_invalids(_num_invalid):
802        if _num_invalid > 0:
803            warn(
804                "Too few arguments were provided. "
805                + f"{_num_invalid} argument"
806                + ('s have ' if _num_invalid != 1 else " has ")
807                + " been filled with `None`.",
808            )
809
810    num_invalid: int = 0
811    for i, (param, val) in enumerate(func_params.items()):
812        if '=' in str(val) or '*' in str(val):
813            _warn_invalids(num_invalid)
814            return tuple(acceptable_args)
815
816        try:
817            acceptable_args.append(args[i])
818        except IndexError:
819            acceptable_args.append(None)
820            num_invalid += 1
821
822    _warn_invalids(num_invalid)
823    return tuple(acceptable_args)

Filter out unsupported positional arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): The arguments to be filtered and passed into func. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled with None.
Returns
  • A tuple of positional arguments accepted by func.
Examples
>>> def foo(a, b):
...     return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
826def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
827    """
828    Convert an ordered dict to a dict.
829    Does not mutate the original OrderedDict.
830    """
831    from collections import OrderedDict
832    _d = dict(od)
833    for k, v in od.items():
834        if isinstance(v, OrderedDict) or (
835            issubclass(type(v), OrderedDict)
836        ):
837            _d[k] = dict_from_od(v)
838    return _d

Convert an ordered dict to a dict. Does not mutate the original OrderedDict.

def remove_ansi(s: str) -> str:
841def remove_ansi(s: str) -> str:
842    """
843    Remove ANSI escape characters from a string.
844
845    Parameters
846    ----------
847    s: str:
848        The string to be cleaned.
849
850    Returns
851    -------
852    A string with the ANSI characters removed.
853
854    Examples
855    --------
856    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
857    'Hello, World!'
858
859    """
860    import re
861    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)

Remove ANSI escape characters from a string.

Parameters
  • s (str:): The string to be cleaned.
Returns
  • A string with the ANSI characters removed.
Examples
>>> remove_ansi("Hello, World!")
'Hello, World!'
def get_connector_labels( *types: str, search_term: str = '', ignore_exact_match=True, _additional_options: Optional[List[str]] = None) -> List[str]:
864def get_connector_labels(
865    *types: str,
866    search_term: str = '',
867    ignore_exact_match = True,
868    _additional_options: Optional[List[str]] = None,
869) -> List[str]:
870    """
871    Read connector labels from the configuration dictionary.
872
873    Parameters
874    ----------
875    *types: str
876        The connector types.
877        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
878
879    search_term: str, default ''
880        A filter on the connectors' labels.
881
882    ignore_exact_match: bool, default True
883        If `True`, skip a connector if the search_term is an exact match.
884
885    Returns
886    -------
887    A list of the keys of defined connectors.
888
889    """
890    from meerschaum.config import get_config
891    connectors = get_config('meerschaum', 'connectors')
892
893    _types = list(types)
894    if len(_types) == 0:
895        _types = list(connectors.keys()) + ['plugin']
896
897    conns = []
898    for t in _types:
899        if t == 'plugin':
900            from meerschaum.plugins import get_data_plugins
901            conns += [
902                f'{t}:' + plugin.module.__name__.split('.')[-1]
903                for plugin in get_data_plugins()
904            ]
905            continue
906        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
907
908    if _additional_options:
909        conns += _additional_options
910
911    possibilities = [
912        c
913        for c in conns
914        if c.startswith(search_term)
915            and c != (
916                search_term if ignore_exact_match else ''
917            )
918    ]
919    return sorted(possibilities)

Read connector labels from the configuration dictionary.

Parameters
  • *types (str): The connector types. If none are provided, use the defined types ('sql' and 'api') and 'plugin'.
  • search_term (str, default ''): A filter on the connectors' labels.
  • ignore_exact_match (bool, default True): If True, skip a connector if the search_term is an exact match.
Returns
  • A list of the keys of defined connectors.
def wget( url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, color: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
922def wget(
923    url: str,
924    dest: Optional[Union[str, 'pathlib.Path']] = None,
925    headers: Optional[Dict[str, Any]] = None,
926    color: bool = True,
927    debug: bool = False,
928    **kw: Any
929) -> 'pathlib.Path':
930    """
931    Mimic `wget` with `requests`.
932
933    Parameters
934    ----------
935    url: str
936        The URL to the resource to be downloaded.
937
938    dest: Optional[Union[str, pathlib.Path]], default None
939        The destination path of the downloaded file.
940        If `None`, save to the current directory.
941
942    color: bool, default True
943        If `debug` is `True`, print color output.
944
945    debug: bool, default False
946        Verbosity toggle.
947
948    Returns
949    -------
950    The path to the downloaded file.
951
952    """
953    from meerschaum.utils.warnings import warn, error
954    from meerschaum.utils.debug import dprint
955    import re, urllib.request
956    if headers is None:
957        headers = {}
958    request = urllib.request.Request(url, headers=headers)
959    if not color:
960        dprint = print
961    if debug:
962        dprint(f"Downloading from '{url}'...")
963    try:
964        response = urllib.request.urlopen(request)
965    except Exception as e:
966        import ssl
967        ssl._create_default_https_context = ssl._create_unverified_context
968        try:
969            response = urllib.request.urlopen(request)
970        except Exception as _e:
971            print(_e)
972            response = None
973    if response is None or response.code != 200:
974        error_msg = f"Failed to download from '{url}'."
975        if color:
976            error(error_msg)
977        else:
978            print(error_msg)
979            import sys
980            sys.exit(1)
981
982    d = response.headers.get('content-disposition', None)
983    fname = (
984        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
985        else url.split('/')[-1]
986    )
987
988    if dest is None:
989        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
990    elif isinstance(dest, str):
991        dest = pathlib.Path(dest)
992
993    with open(dest, 'wb') as f:
994        f.write(response.fp.read())
995
996    if debug:
997        dprint(f"Downloaded file '{dest}'.")
998
999    return dest

Mimic wget with requests.

Parameters
  • url (str): The URL to the resource to be downloaded.
  • dest (Optional[Union[str, pathlib.Path]], default None): The destination path of the downloaded file. If None, save to the current directory.
  • color (bool, default True): If debug is True, print color output.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The path to the downloaded file.
def async_wrap(func):
1002def async_wrap(func):
1003    """
1004    Run a synchronous function as async.
1005    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1006    """
1007    import asyncio
1008    from functools import wraps, partial
1009
1010    @wraps(func)
1011    async def run(*args, loop=None, executor=None, **kwargs):
1012        if loop is None:
1013            loop = asyncio.get_event_loop()
1014        pfunc = partial(func, *args, **kwargs)
1015        return await loop.run_in_executor(executor, pfunc)
1016    return run
def debug_trace(browser: bool = True):
1019def debug_trace(browser: bool = True):
1020    """
1021    Open a web-based debugger to trace the execution of the program.
1022
1023    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1024    """
1025    from meerschaum.utils.debug import trace
1026    trace(browser=browser)

Open a web-based debugger to trace the execution of the program.

This is an alias import for meerschaum.utils.debug.debug_trace.

def items_str( items: List[Any], quotes: bool = True, quote_str: str = "'", commas: bool = True, comma_str: str = ',', and_: bool = True, and_str: str = 'and', oxford_comma: bool = True, spaces: bool = True, space_str=' ') -> str:
1029def items_str(
1030    items: List[Any],
1031    quotes: bool = True,
1032    quote_str: str = "'",
1033    commas: bool = True,
1034    comma_str: str = ',',
1035    and_: bool = True,
1036    and_str: str = 'and',
1037    oxford_comma: bool = True,
1038    spaces: bool = True,
1039    space_str = ' ',
1040) -> str:
1041    """
1042    Return a formatted string if list items separated by commas.
1043
1044    Parameters
1045    ----------
1046    items: [List[Any]]
1047        The items to be printed as an English list.
1048
1049    quotes: bool, default True
1050        If `True`, wrap items in quotes.
1051
1052    quote_str: str, default "'"
1053        If `quotes` is `True`, prepend and append each item with this string.
1054
1055    and_: bool, default True
1056        If `True`, include the word 'and' before the final item in the list.
1057
1058    and_str: str, default 'and'
1059        If `and_` is True, insert this string where 'and' normally would in and English list.
1060
1061    oxford_comma: bool, default True
1062        If `True`, include the Oxford Comma (comma before the final 'and').
1063        Only applies when `and_` is `True`.
1064
1065    spaces: bool, default True
1066        If `True`, separate items with `space_str`
1067
1068    space_str: str, default ' '
1069        If `spaces` is `True`, separate items with this string.
1070
1071    Returns
1072    -------
1073    A string of the items as an English list.
1074
1075    Examples
1076    --------
1077    >>> items_str([1,2,3])
1078    "'1', '2', and '3'"
1079    >>> items_str([1,2,3], quotes=False)
1080    '1, 2, and 3'
1081    >>> items_str([1,2,3], and_=False)
1082    "'1', '2', '3'"
1083    >>> items_str([1,2,3], spaces=False, and_=False)
1084    "'1','2','3'"
1085    >>> items_str([1,2,3], oxford_comma=False)
1086    "'1', '2' and '3'"
1087    >>> items_str([1,2,3], quote_str=":")
1088    ':1:, :2:, and :3:'
1089    >>> items_str([1,2,3], and_str="or")
1090    "'1', '2', or '3'"
1091    >>> items_str([1,2,3], space_str="_")
1092    "'1',_'2',_and_'3'"
1093
1094    """
1095    if not items:
1096        return ''
1097    
1098    q = quote_str if quotes else ''
1099    s = space_str if spaces else ''
1100    a = and_str if and_ else ''
1101    c = comma_str if commas else ''
1102
1103    if len(items) == 1:
1104        return q + str(list(items)[0]) + q
1105
1106    if len(items) == 2:
1107        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1108
1109    sep = q + c + s + q
1110    output = q + sep.join(str(i) for i in items[:-1]) + q
1111    if oxford_comma:
1112        output += c
1113    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1114    return output

Return a formatted string if list items separated by commas.

Parameters
  • items ([List[Any]]): The items to be printed as an English list.
  • quotes (bool, default True): If True, wrap items in quotes.
  • quote_str (str, default "'"): If quotes is True, prepend and append each item with this string.
  • and_ (bool, default True): If True, include the word 'and' before the final item in the list.
  • and_str (str, default 'and'): If and_ is True, insert this string where 'and' normally would in and English list.
  • oxford_comma (bool, default True): If True, include the Oxford Comma (comma before the final 'and'). Only applies when and_ is True.
  • spaces (bool, default True): If True, separate items with space_str
  • space_str (str, default ' '): If spaces is True, separate items with this string.
Returns
  • A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
def interval_str(delta: Union[datetime.timedelta, int], round_unit: bool = False) -> str:
1117def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str:
1118    """
1119    Return a human-readable string for a `timedelta` (or `int` minutes).
1120
1121    Parameters
1122    ----------
1123    delta: Union[timedelta, int]
1124        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1125
1126    round_unit: bool, default False
1127        If `True`, round the output to a single unit.
1128
1129    Returns
1130    -------
1131    A formatted string, fit for human eyes.
1132    """
1133    from meerschaum.utils.packages import attempt_import
1134    if is_int(str(delta)) and not round_unit:
1135        return str(delta)
1136
1137    humanfriendly = attempt_import('humanfriendly', lazy=False)
1138    delta_seconds = (
1139        delta.total_seconds()
1140        if hasattr(delta, 'total_seconds')
1141        else (delta * 60)
1142    )
1143
1144    is_negative = delta_seconds < 0
1145    delta_seconds = abs(delta_seconds)
1146    replace_units = {}
1147
1148    if round_unit:
1149        if delta_seconds < 1:
1150            delta_seconds = round(delta_seconds, 2)
1151        elif delta_seconds < 60:
1152            delta_seconds = int(delta_seconds)
1153        elif delta_seconds < 3600:
1154            delta_seconds = int(delta_seconds / 60) * 60
1155        elif delta_seconds < 86400:
1156            delta_seconds = int(delta_seconds / 3600) * 3600
1157        elif delta_seconds < (86400 * 7):
1158            delta_seconds = int(delta_seconds / 86400) * 86400
1159        elif delta_seconds < (86400 * 7 * 4):
1160            delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7)
1161        elif delta_seconds < (86400 * 7 * 4 * 13):
1162            delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7)
1163            replace_units['weeks'] = 'months'
1164        else:
1165            delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364)
1166
1167    delta_str = humanfriendly.format_timespan(delta_seconds)
1168    if ',' in delta_str and round_unit:
1169        delta_str = delta_str.split(',')[0]
1170    elif ' and ' in delta_str and round_unit:
1171        delta_str = delta_str.split(' and ')[0]
1172
1173    for parsed_unit, replacement_unit in replace_units.items():
1174        delta_str = delta_str.replace(parsed_unit, replacement_unit)
1175
1176    return delta_str + (' ago' if is_negative else '')

Return a human-readable string for a timedelta (or int minutes).

Parameters
  • delta (Union[timedelta, int]): The interval to print. If delta is an integer, assume it corresponds to minutes.
  • round_unit (bool, default False): If True, round the output to a single unit.
Returns
  • A formatted string, fit for human eyes.
def is_docker_available() -> bool:
1179def is_docker_available() -> bool:
1180    """Check if we can connect to the Docker engine."""
1181    import subprocess
1182    try:
1183        has_docker = subprocess.call(
1184            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1185        ) == 0
1186    except Exception:
1187        has_docker = False
1188    return has_docker

Check if we can connect to the Docker engine.

def is_android() -> bool:
1191def is_android() -> bool:
1192    """Return `True` if the current platform is Android."""
1193    import sys
1194    return hasattr(sys, 'getandroidapilevel')

Return True if the current platform is Android.

def is_bcp_available() -> bool:
1197def is_bcp_available() -> bool:
1198    """Check if the MSSQL `bcp` utility is installed."""
1199    import subprocess
1200
1201    try:
1202        has_bcp = subprocess.call(
1203            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1204        ) == 0
1205    except Exception:
1206        has_bcp = False
1207    return has_bcp

Check if the MSSQL bcp utility is installed.

def is_systemd_available() -> bool:
1210def is_systemd_available() -> bool:
1211    """Check if running on systemd."""
1212    return os.path.isdir('/run/systemd/system')

Check if running on systemd.

def is_tmux_available() -> bool:
1215def is_tmux_available() -> bool:
1216    """
1217    Check if `tmux` is installed.
1218    """
1219    import subprocess
1220    try:
1221        has_tmux = subprocess.call(
1222            ['tmux', '-V'],
1223            stdout=subprocess.DEVNULL,
1224            stderr=subprocess.STDOUT
1225        ) == 0
1226    except FileNotFoundError:
1227        has_tmux = False
1228    except Exception:
1229        has_tmux = False
1230    return has_tmux

Check if tmux is installed.

def get_last_n_lines(file_name: str, N: int):
1232def get_last_n_lines(file_name: str, N: int):
1233    """
1234    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1235    """
1236    # Create an empty list to keep the track of last N lines
1237    list_of_lines = []
1238    # Open file for reading in binary mode
1239    with open(file_name, 'rb') as read_obj:
1240        # Move the cursor to the end of the file
1241        read_obj.seek(0, os.SEEK_END)
1242        # Create a buffer to keep the last read line
1243        buffer = bytearray()
1244        # Get the current position of pointer i.e eof
1245        pointer_location = read_obj.tell()
1246        # Loop till pointer reaches the top of the file
1247        while pointer_location >= 0:
1248            # Move the file pointer to the location pointed by pointer_location
1249            read_obj.seek(pointer_location)
1250            # Shift pointer location by -1
1251            pointer_location = pointer_location -1
1252            # read that byte / character
1253            new_byte = read_obj.read(1)
1254            # If the read byte is new line character then it means one line is read
1255            if new_byte == b'\n':
1256                # Save the line in list of lines
1257                list_of_lines.append(buffer.decode()[::-1])
1258                # If the size of list reaches N, then return the reversed list
1259                if len(list_of_lines) == N:
1260                    return list(reversed(list_of_lines))
1261                # Reinitialize the byte array to save next line
1262                buffer = bytearray()
1263            else:
1264                # If last read character is not eol then add it in buffer
1265                buffer.extend(new_byte)
1266        # As file is read completely, if there is still data in buffer, then its first line.
1267        if len(buffer) > 0:
1268            list_of_lines.append(buffer.decode()[::-1])
1269    # return the reversed list
1270    return list(reversed(list_of_lines))
def tail(f, n, offset=None):
1273def tail(f, n, offset=None):
1274    """
1275    https://stackoverflow.com/a/692616/9699829
1276    
1277    Reads n lines from f with an offset of offset lines.  The return
1278    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1279    an indicator that is `True` if there are more lines in the file.
1280    """
1281    avg_line_length = 74
1282    to_read = n + (offset or 0)
1283
1284    while True:
1285        try:
1286            f.seek(-(avg_line_length * to_read), 2)
1287        except IOError:
1288            # woops.  apparently file is smaller than what we want
1289            # to step back, go to the beginning instead
1290            f.seek(0)
1291        pos = f.tell()
1292        lines = f.read().splitlines()
1293        if len(lines) >= to_read or pos == 0:
1294            return lines[-to_read:offset and -offset or None], \
1295                   len(lines) > to_read or pos > 0
1296        avg_line_length *= 1.3

https://stackoverflow.com/a/692616/9699829

Reads n lines from f with an offset of offset lines. The return value is a tuple in the form (lines, has_more) where has_more is an indicator that is True if there are more lines in the file.

def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1299def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1300    """
1301    Remove characters from each section of a string until the length is within the limit.
1302
1303    Parameters
1304    ----------
1305    item: str
1306        The item name to be truncated.
1307
1308    delimeter: str, default '_'
1309        Split `item` by this string into several sections.
1310
1311    max_len: int, default 128
1312        The max acceptable length of the truncated version of `item`.
1313
1314    Returns
1315    -------
1316    The truncated string.
1317
1318    Examples
1319    --------
1320    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1321    'ab_de_gh'
1322
1323    """
1324    if len(item) < max_len:
1325        return item
1326
1327    def _shorten(s: str) -> str:
1328        return s[:-1] if len(s) > 1 else s
1329
1330    sections = list(enumerate(item.split(delimeter)))
1331    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1332    available_chars = max_len - len(sections)
1333
1334    _sections = [(i, s) for i, s in sorted_sections]
1335    _sections_len = sum([len(s) for i, s in _sections])
1336    _old_sections_len = _sections_len
1337    while _sections_len > available_chars:
1338        _sections = [(i, _shorten(s)) for i, s in _sections]
1339        _old_sections_len = _sections_len
1340        _sections_len = sum([len(s) for i, s in _sections])
1341        if _old_sections_len == _sections_len:
1342            raise Exception(f"String could not be truncated: '{item}'")
1343
1344    new_sections = sorted(_sections, key=lambda x: x[0])
1345    return delimeter.join([s for i, s in new_sections])

Remove characters from each section of a string until the length is within the limit.

Parameters
  • item (str): The item name to be truncated.
  • delimeter (str, default '_'): Split item by this string into several sections.
  • max_len (int, default 128): The max acceptable length of the truncated version of item.
Returns
  • The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
def truncate_text_for_display(text: str, max_length: int = 50, suffix: str = '…') -> str:
1348def truncate_text_for_display(
1349    text: str,
1350    max_length: int = 50,
1351    suffix: str = '…',
1352) -> str:
1353    """
1354    Truncate a potentially long string for display purposes.
1355
1356    Parameters
1357    ----------
1358    text: str
1359        The string to be truncated.
1360
1361    max_length: int, default 60
1362        The maximum length of `text` before truncation.
1363
1364    suffix: str, default '…'
1365        The string to append to the length of `text` to indicate truncation.
1366
1367    Returns
1368    -------
1369    A string of length `max_length` or less.
1370    """
1371    text_length = len(text)
1372    if text_length <= max_length:
1373        return text
1374
1375    suffix_length = len(suffix)
1376
1377    truncated_text = text[:max_length - suffix_length]
1378    return truncated_text + suffix

Truncate a potentially long string for display purposes.

Parameters
  • text (str): The string to be truncated.
  • max_length (int, default 60): The maximum length of text before truncation.
  • suffix (str, default '…'): The string to append to the length of text to indicate truncation.
Returns
  • A string of length max_length or less.
def separate_negation_values( vals: Union[List[str], Tuple[str]], negation_prefix: Optional[str] = None) -> Tuple[List[str], List[str]]:
1381def separate_negation_values(
1382    vals: Union[List[str], Tuple[str]],
1383    negation_prefix: Optional[str] = None,
1384) -> Tuple[List[str], List[str]]:
1385    """
1386    Separate the negated values from the positive ones.
1387    Return two lists: positive and negative values.
1388
1389    Parameters
1390    ----------
1391    vals: Union[List[str], Tuple[str]]
1392        A list of strings to parse.
1393
1394    negation_prefix: Optional[str], default None
1395        Include values that start with this string in the second list.
1396        If `None`, use the system default (`_`).
1397    """
1398    if negation_prefix is None:
1399        from meerschaum._internal.static import STATIC_CONFIG
1400        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1401    _in_vals, _ex_vals = [], []
1402    for v in vals:
1403        if str(v).startswith(negation_prefix):
1404            _ex_vals.append(str(v)[len(negation_prefix):])
1405        else:
1406            _in_vals.append(v)
1407
1408    return _in_vals, _ex_vals

Separate the negated values from the positive ones. Return two lists: positive and negative values.

Parameters
  • vals (Union[List[str], Tuple[str]]): A list of strings to parse.
  • negation_prefix (Optional[str], default None): Include values that start with this string in the second list. If None, use the system default (_).
def get_in_ex_params( params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1411def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1412    """
1413    Translate a params dictionary into lists of include- and exclude-values.
1414
1415    Parameters
1416    ----------
1417    params: Optional[Dict[str, Any]]
1418        A params query dictionary.
1419
1420    Returns
1421    -------
1422    A dictionary mapping keys to a tuple of lists for include and exclude values.
1423
1424    Examples
1425    --------
1426    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1427    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1428    """
1429    if not params:
1430        return {}
1431    return {
1432        col: separate_negation_values(
1433            (
1434                val
1435                if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype')
1436                else [val]
1437            )
1438        )
1439        for col, val in params.items()
1440    }

Translate a params dictionary into lists of include- and exclude-values.

Parameters
  • params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
  • A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
def flatten_list(list_: List[Any]) -> List[Any]:
1443def flatten_list(list_: List[Any]) -> List[Any]:
1444    """
1445    Recursively flatten a list.
1446    """
1447    for item in list_:
1448        if isinstance(item, list):
1449            yield from flatten_list(item)
1450        else:
1451            yield item

Recursively flatten a list.

def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1454def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1455    """
1456    Parse a string containing the text to be passed into a function
1457    and return a tuple of args, kwargs.
1458
1459    Parameters
1460    ----------
1461    args_str: str
1462        The contents of the function parameter (as a string).
1463
1464    Returns
1465    -------
1466    A tuple of args (tuple) and kwargs (dict[str, Any]).
1467
1468    Examples
1469    --------
1470    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1471    (123, 456), {'foo': 789, 'bar': 'baz'}
1472    """
1473    import ast
1474    args = []
1475    kwargs = {}
1476
1477    for part in args_str.split(','):
1478        if '=' in part:
1479            key, val = part.split('=', 1)
1480            kwargs[key.strip()] = ast.literal_eval(val)
1481        else:
1482            args.append(ast.literal_eval(part.strip()))
1483
1484    return tuple(args), kwargs

Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.

Parameters
  • args_str (str): The contents of the function parameter (as a string).
Returns
  • A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
def parametrized(dec):
1558def parametrized(dec):
1559    """
1560    A meta-decorator for allowing other decorator functions to have parameters.
1561
1562    https://stackoverflow.com/a/26151604/9699829
1563    """
1564    def layer(*args, **kwargs):
1565        def repl(f):
1566            return dec(f, *args, **kwargs)
1567        return repl
1568    return layer

A meta-decorator for allowing other decorator functions to have parameters.

https://stackoverflow.com/a/26151604/9699829

def safely_extract_tar(tarf: "'file'", output_dir: Union[str, pathlib.Path]) -> None:
1571def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1572    """
1573    Safely extract a TAR file to a give directory.
1574    This defends against CVE-2007-4559.
1575
1576    Parameters
1577    ----------
1578    tarf: file
1579        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1580
1581    output_dir: Union[str, pathlib.Path]
1582        The output directory.
1583    """
1584
1585    def is_within_directory(directory, target):
1586        abs_directory = os.path.abspath(directory)
1587        abs_target = os.path.abspath(target)
1588        prefix = os.path.commonprefix([abs_directory, abs_target])
1589        return prefix == abs_directory 
1590
1591    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1592        for member in tar.getmembers():
1593            member_path = os.path.join(path, member.name)
1594            if not is_within_directory(path, member_path):
1595                raise Exception("Attempted Path Traversal in Tar File")
1596
1597        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1598
1599    return safe_extract(tarf, output_dir)

Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.

Parameters
  • tarf (file): The TAR file opened with tarfile.open(path, 'r:gz').
  • output_dir (Union[str, pathlib.Path]): The output directory.
def to_snake_case(name: str) -> str:
1602def to_snake_case(name: str) -> str:
1603    """
1604    Return the given string in snake-case-style.
1605
1606    Parameters
1607    ----------
1608    name: str
1609        The input text to convert to snake case.
1610
1611    Returns
1612    -------
1613    A snake-case version of `name`.
1614
1615    Examples
1616    --------
1617    >>> to_snake_case("HelloWorld!")
1618    'hello_world'
1619    >>> to_snake_case("This has spaces in it.")
1620    'this_has_spaces_in_it'
1621    >>> to_snake_case("already_in_snake_case")
1622    'already_in_snake_case'
1623    """
1624    import re
1625    name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
1626    name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
1627    name = re.sub(r'[^\w\s]', '', name)
1628    name = re.sub(r'\s+', '_', name)
1629    return name.lower()

Return the given string in snake-case-style.

Parameters
  • name (str): The input text to convert to snake case.
Returns
  • A snake-case version of name.
Examples
>>> to_snake_case("HelloWorld!")
'hello_world'
>>> to_snake_case("This has spaces in it.")
'this_has_spaces_in_it'
>>> to_snake_case("already_in_snake_case")
'already_in_snake_case'
def get_directory_size(path: 'Path') -> int:
1632def get_directory_size(path: Path) -> int:
1633    """
1634    Return the cumulative size of a directory's files in bytes.
1635    https://stackoverflow.com/a/55659577/9699829
1636    """
1637    return sum(file.stat().st_size for file in path.rglob('*'))

Return the cumulative size of a directory's files in bytes. https://stackoverflow.com/a/55659577/9699829

def choose_subaction(*args, **kwargs) -> Any:
1644def choose_subaction(*args, **kwargs) -> Any:
1645    """
1646    Placeholder function to prevent breaking legacy behavior.
1647    See `meerschaum.actions.choose_subaction`.
1648    """
1649    from meerschaum.actions import choose_subaction as _choose_subactions
1650    return _choose_subactions(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.actions.choose_subaction.

def json_serialize_datetime(dt: datetime.datetime) -> Optional[str]:
1770def json_serialize_datetime(dt: datetime) -> Union[str, None]:
1771    """
1772    Serialize a datetime object into JSON (ISO format string).
1773
1774    Examples
1775    --------
1776    >>> import json
1777    >>> from datetime import datetime
1778    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
1779    '{"a": "2022-01-01T00:00:00Z"}'
1780
1781    """
1782    from meerschaum.utils.dtypes import serialize_datetime
1783    return serialize_datetime(dt)

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
def flatten_pipes_dict(*args, **kwargs):
1813def flatten_pipes_dict(*args, **kwargs):
1814    """
1815    Placeholder function to prevent breaking legacy behavior.
1816    See `meerschaum.utils.pipes.flatten_pipes_dict`.
1817    """
1818    from meerschaum.utils.pipes import flatten_pipes_dict
1819    return flatten_pipes_dict(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.pipes.flatten_pipes_dict.