meerschaum.utils.misc

Miscellaneous functions go here

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4"""
   5Miscellaneous functions go here
   6"""
   7
   8from __future__ import annotations
   9import sys
  10from datetime import timedelta, datetime, timezone
  11from meerschaum.utils.typing import (
  12    Union,
  13    Any,
  14    Callable,
  15    Optional,
  16    List,
  17    Dict,
  18    SuccessTuple,
  19    Iterable,
  20    PipesDict,
  21    Tuple,
  22    InstanceConnector,
  23    Hashable,
  24    Generator,
  25    Iterator,
  26    TYPE_CHECKING,
  27)
  28import meerschaum as mrsm
  29if TYPE_CHECKING:
  30    import collections
  31
  32__pdoc__: Dict[str, bool] = {
  33    'to_pandas_dtype': False,
  34    'filter_unseen_df': False,
  35    'add_missing_cols_to_df': False,
  36    'parse_df_datetimes': False,
  37    'df_from_literal': False,
  38    'get_json_cols': False,
  39    'get_unhashable_cols': False,
  40    'enforce_dtypes': False,
  41    'get_datetime_bound_from_df': False,
  42    'df_is_chunk_generator': False,
  43    'choices_docstring': False,
  44    '_get_subaction_names': False,
  45}
  46
  47
  48def add_method_to_class(
  49        func: Callable[[Any], Any],
  50        class_def: 'Class',
  51        method_name: Optional[str] = None,
  52        keep_self: Optional[bool] = None,
  53    ) -> Callable[[Any], Any]:
  54    """
  55    Add function `func` to class `class_def`.
  56
  57    Parameters
  58    ----------
  59    func: Callable[[Any], Any]
  60        Function to be added as a method of the class
  61
  62    class_def: Class
  63        Class to be modified.
  64
  65    method_name: Optional[str], default None
  66        New name of the method. None will use func.__name__ (default).
  67
  68    Returns
  69    -------
  70    The modified function object.
  71
  72    """
  73    from functools import wraps
  74
  75    is_class = isinstance(class_def, type)
  76    
  77    @wraps(func)
  78    def wrapper(self, *args, **kw):
  79        return func(*args, **kw)
  80
  81    if method_name is None:
  82        method_name = func.__name__
  83
  84    setattr(class_def, method_name, (
  85            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
  86        )
  87    )
  88
  89    return func
  90
  91
  92def generate_password(length: int = 12) -> str:
  93    """
  94    Generate a secure password of given length.
  95
  96    Parameters
  97    ----------
  98    length: int, default 12
  99        The length of the password.
 100
 101    Returns
 102    -------
 103    A random password string.
 104    """
 105    import secrets
 106    import string
 107    return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))
 108 
 109
 110def is_int(s: str) -> bool:
 111    """
 112    Check if string is an int.
 113
 114    Parameters
 115    ----------
 116    s: str
 117        The string to be checked.
 118        
 119    Returns
 120    -------
 121    A bool indicating whether the string was able to be cast to an integer.
 122
 123    """
 124    try:
 125        float(s)
 126    except Exception:
 127        return False
 128    
 129    return float(s).is_integer()
 130
 131
 132def string_to_dict(
 133    params_string: str
 134) -> Dict[str, Any]:
 135    """
 136    Parse a string into a dictionary.
 137    
 138    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
 139
 140    Parameters
 141    ----------
 142    params_string: str
 143        The string to be parsed.
 144        
 145    Returns
 146    -------
 147    The parsed dictionary.
 148
 149    Examples
 150    --------
 151    >>> string_to_dict("a:1,b:2") 
 152    {'a': 1, 'b': 2}
 153    >>> string_to_dict('{"a": 1, "b": 2}')
 154    {'a': 1, 'b': 2}
 155
 156    """
 157    if params_string == "":
 158        return {}
 159
 160    import json
 161
 162    ### Kind of a weird edge case.
 163    ### In the generated compose file, there is some weird escaping happening,
 164    ### so the string to be parsed starts and ends with a single quote.
 165    if (
 166        isinstance(params_string, str)
 167        and len(params_string) > 4
 168        and params_string[1] == "{"
 169        and params_string[-2] == "}"
 170    ):
 171        return json.loads(params_string[1:-1])
 172    if str(params_string).startswith('{'):
 173        return json.loads(params_string)
 174
 175    import ast
 176    params_dict = {}
 177    for param in params_string.split(","):
 178        _keys = param.split(":")
 179        keys = _keys[:-1]
 180        try:
 181            val = ast.literal_eval(_keys[-1])
 182        except Exception:
 183            val = str(_keys[-1])
 184
 185        c = params_dict
 186        for _k in keys[:-1]:
 187            try:
 188                k = ast.literal_eval(_k)
 189            except Exception:
 190                k = str(_k)
 191            if k not in c:
 192                c[k] = {}
 193            c = c[k]
 194
 195        c[keys[-1]] = val
 196
 197    return params_dict
 198
 199
 200def parse_config_substitution(
 201    value: str,
 202    leading_key: str = 'MRSM',
 203    begin_key: str = '{',
 204    end_key: str = '}',
 205    delimeter: str = ':'
 206) -> List[Any]:
 207    """
 208    Parse Meerschaum substitution syntax
 209    E.g. MRSM{value1:value2} => ['value1', 'value2']
 210    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
 211    """
 212    if not value.beginswith(leading_key):
 213        return value
 214
 215    return leading_key[len(leading_key):][len():-1].split(delimeter)
 216
 217
 218def edit_file(
 219    path: Union['pathlib.Path', str],
 220    default_editor: str = 'pyvim',
 221    debug: bool = False
 222) -> bool:
 223    """
 224    Open a file for editing.
 225
 226    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
 227
 228    Parameters
 229    ----------
 230    path: Union[pathlib.Path, str]
 231        The path to the file to be edited.
 232
 233    default_editor: str, default 'pyvim'
 234        If `$EDITOR` is not set, use this instead.
 235        If `pyvim` is not installed, it will install it from PyPI.
 236
 237    debug: bool, default False
 238        Verbosity toggle.
 239
 240    Returns
 241    -------
 242    A bool indicating the file was successfully edited.
 243    """
 244    import os
 245    from subprocess import call
 246    from meerschaum.utils.debug import dprint
 247    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
 248    try:
 249        EDITOR = os.environ.get('EDITOR', default_editor)
 250        if debug:
 251            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
 252        rc = call([EDITOR, path])
 253    except Exception as e: ### can't open with default editors
 254        if debug:
 255            dprint(str(e))
 256            dprint('Failed to open file with system editor. Falling back to pyvim...')
 257        pyvim = attempt_import('pyvim', lazy=False)
 258        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
 259    return rc == 0
 260
 261
 262def is_pipe_registered(
 263    pipe: mrsm.Pipe,
 264    pipes: PipesDict,
 265    debug: bool = False
 266) -> bool:
 267    """
 268    Check if a Pipe is inside the pipes dictionary.
 269
 270    Parameters
 271    ----------
 272    pipe: meerschaum.Pipe
 273        The pipe to see if it's in the dictionary.
 274
 275    pipes: PipesDict
 276        The dictionary to search inside.
 277
 278    debug: bool, default False
 279        Verbosity toggle.
 280
 281    Returns
 282    -------
 283    A bool indicating whether the pipe is inside the dictionary.
 284    """
 285    from meerschaum.utils.debug import dprint
 286    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
 287    if debug:
 288        dprint(f'{ck}, {mk}, {lk}')
 289        dprint(f'{pipe}, {pipes}')
 290    return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]
 291
 292
 293def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
 294    """
 295    Determine the columns and lines in the terminal.
 296    If they cannot be determined, return the default values (100 columns and 120 lines).
 297
 298    Parameters
 299    ----------
 300    default_cols: int, default 100
 301        If the columns cannot be determined, return this value.
 302
 303    default_lines: int, default 120
 304        If the lines cannot be determined, return this value.
 305
 306    Returns
 307    -------
 308    A tuple if integers for the columns and lines.
 309    """
 310    import os
 311    try:
 312        size = os.get_terminal_size()
 313        _cols, _lines = size.columns, size.lines
 314    except Exception as e:
 315        _cols, _lines = (
 316            int(os.environ.get('COLUMNS', str(default_cols))),
 317            int(os.environ.get('LINES', str(default_lines))),
 318        )
 319    return _cols, _lines
 320
 321
 322def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
 323    """
 324    Iterate over a list in chunks.
 325    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
 326
 327    Parameters
 328    ----------
 329    iterable: Iterable[Any]
 330        The iterable to iterate over in chunks.
 331        
 332    chunksize: int
 333        The size of chunks to iterate with.
 334        
 335    fillvalue: Optional[Any], default None
 336        If the chunks do not evenly divide into the iterable, pad the end with this value.
 337
 338    Returns
 339    -------
 340    A generator of tuples of size `chunksize`.
 341
 342    """
 343    from itertools import zip_longest
 344    args = [iter(iterable)] * chunksize
 345    return zip_longest(*args, fillvalue=fillvalue)
 346
 347def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
 348    """
 349    Sort a dictionary's values and return a new dictionary.
 350
 351    Parameters
 352    ----------
 353    d: Dict[Any, Any]
 354        The dictionary to be sorted.
 355
 356    Returns
 357    -------
 358    A sorted dictionary.
 359
 360    Examples
 361    --------
 362    >>> sorted_dict({'b': 1, 'a': 2})
 363    {'b': 1, 'a': 2}
 364    >>> sorted_dict({'b': 2, 'a': 1})
 365    {'a': 1, 'b': 2}
 366
 367    """
 368    try:
 369        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
 370    except Exception as e:
 371        return d
 372
 373def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
 374    """
 375    Convert the standard pipes dictionary into a list.
 376
 377    Parameters
 378    ----------
 379    pipes_dict: PipesDict
 380        The pipes dictionary to be flattened.
 381
 382    Returns
 383    -------
 384    A list of `Pipe` objects.
 385
 386    """
 387    pipes_list = []
 388    for ck in pipes_dict.values():
 389        for mk in ck.values():
 390            pipes_list += list(mk.values())
 391    return pipes_list
 392
 393
 394def round_time(
 395    dt: Optional[datetime] = None,
 396    date_delta: Optional[timedelta] = None,
 397    to: 'str' = 'down'
 398) -> datetime:
 399    """
 400    Round a datetime object to a multiple of a timedelta.
 401    http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
 402
 403    NOTE: This function strips timezone information!
 404
 405    Parameters
 406    ----------
 407    dt: Optional[datetime], default None
 408        If `None`, grab the current UTC datetime.
 409
 410    date_delta: Optional[timedelta], default None
 411        If `None`, use a delta of 1 minute.
 412
 413    to: 'str', default 'down'
 414        Available options are `'up'`, `'down'`, and `'closest'`.
 415
 416    Returns
 417    -------
 418    A rounded `datetime` object.
 419
 420    Examples
 421    --------
 422    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
 423    datetime.datetime(2022, 1, 1, 12, 15)
 424    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
 425    datetime.datetime(2022, 1, 1, 12, 16)
 426    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
 427    datetime.datetime(2022, 1, 1, 12, 0)
 428    >>> round_time(
 429    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
 430    ...   timedelta(hours=1),
 431    ...   to = 'closest'
 432    ... )
 433    datetime.datetime(2022, 1, 1, 12, 0)
 434    >>> round_time(
 435    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
 436    ...   datetime.timedelta(hours=1),
 437    ...   to = 'closest'
 438    ... )
 439    datetime.datetime(2022, 1, 1, 13, 0)
 440
 441    """
 442    if date_delta is None:
 443        date_delta = timedelta(minutes=1)
 444    round_to = date_delta.total_seconds()
 445    if dt is None:
 446        dt = datetime.now(timezone.utc).replace(tzinfo=None)
 447    seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds
 448
 449    if seconds % round_to == 0 and dt.microsecond == 0:
 450        rounding = (seconds + round_to / 2) // round_to * round_to
 451    else:
 452        if to == 'up':
 453            rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to
 454        elif to == 'down':
 455            rounding = seconds // round_to * round_to
 456        else:
 457            rounding = (seconds + round_to / 2) // round_to * round_to
 458
 459    return dt + timedelta(0, rounding - seconds, - dt.microsecond)
 460
 461
 462def timed_input(
 463        seconds: int = 10,
 464        timeout_message: str = "",
 465        prompt: str = "",
 466        icon: bool = False,
 467        **kw
 468    ) -> Union[str, None]:
 469    """
 470    Accept user input only for a brief period of time.
 471
 472    Parameters
 473    ----------
 474    seconds: int, default 10
 475        The number of seconds to wait.
 476
 477    timeout_message: str, default ''
 478        The message to print after the window has elapsed.
 479
 480    prompt: str, default ''
 481        The prompt to print during the window.
 482
 483    icon: bool, default False
 484        If `True`, print the configured input icon.
 485
 486
 487    Returns
 488    -------
 489    The input string entered by the user.
 490
 491    """
 492    import signal, time
 493
 494    class TimeoutExpired(Exception):
 495        """Raise this exception when the timeout is reached."""
 496
 497    def alarm_handler(signum, frame):
 498        raise TimeoutExpired
 499
 500    # set signal handler
 501    signal.signal(signal.SIGALRM, alarm_handler)
 502    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
 503
 504    try:
 505        return input(prompt)
 506    except TimeoutExpired:
 507        return None
 508    except (EOFError, RuntimeError):
 509        try:
 510            print(prompt)
 511            time.sleep(seconds)
 512        except TimeoutExpired:
 513            return None
 514    finally:
 515        signal.alarm(0) # cancel alarm
 516
 517
 518
 519
 520
 521def replace_pipes_in_dict(
 522        pipes : Optional[PipesDict] = None,
 523        func: 'function' = str,
 524        debug: bool = False,
 525        **kw
 526    ) -> PipesDict:
 527    """
 528    Replace the Pipes in a Pipes dict with the result of another function.
 529
 530    Parameters
 531    ----------
 532    pipes: Optional[PipesDict], default None
 533        The pipes dict to be processed.
 534
 535    func: Callable[[Any], Any], default str
 536        The function to be applied to every pipe.
 537        Defaults to the string constructor.
 538
 539    debug: bool, default False
 540        Verbosity toggle.
 541    
 542
 543    Returns
 544    -------
 545    A dictionary where every pipe is replaced with the output of a function.
 546
 547    """
 548    import copy
 549    def change_dict(d : Dict[Any, Any], func : 'function') -> None:
 550        for k, v in d.items():
 551            if isinstance(v, dict):
 552                change_dict(v, func)
 553            else:
 554                d[k] = func(v)
 555
 556    if pipes is None:
 557        from meerschaum import get_pipes
 558        pipes = get_pipes(debug=debug, **kw)
 559
 560    result = copy.deepcopy(pipes)
 561    change_dict(result, func)
 562    return result
 563
 564def enforce_gevent_monkey_patch():
 565    """
 566    Check if gevent monkey patching is enabled, and if not, then apply patching.
 567    """
 568    from meerschaum.utils.packages import attempt_import
 569    import socket
 570    gevent, gevent_socket, gevent_monkey = attempt_import(
 571        'gevent', 'gevent.socket', 'gevent.monkey'
 572    )
 573    if not socket.socket is gevent_socket.socket:
 574        gevent_monkey.patch_all()
 575
 576def is_valid_email(email: str) -> Union['re.Match', None]:
 577    """
 578    Check whether a string is a valid email.
 579
 580    Parameters
 581    ----------
 582    email: str
 583        The string to be examined.
 584        
 585    Returns
 586    -------
 587    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
 588
 589    Examples
 590    --------
 591    >>> is_valid_email('foo')
 592    >>> is_valid_email('foo@foo.com')
 593    <re.Match object; span=(0, 11), match='foo@foo.com'>
 594
 595    """
 596    import re
 597    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
 598    return re.search(regex, email)
 599
 600
 601def string_width(string: str, widest: bool = True) -> int:
 602    """
 603    Calculate the width of a string, either by its widest or last line.
 604
 605    Parameters
 606    ----------
 607    string: str:
 608        The string to be examined.
 609        
 610    widest: bool, default True
 611        No longer used because `widest` is always assumed to be true.
 612
 613    Returns
 614    -------
 615    An integer for the text's visual width.
 616
 617    Examples
 618    --------
 619    >>> string_width('a')
 620    1
 621    >>> string_width('a\\nbc\\nd')
 622    2
 623
 624    """
 625    def _widest():
 626        words = string.split('\n')
 627        max_length = 0
 628        for w in words:
 629            length = len(w)
 630            if length > max_length:
 631                max_length = length
 632        return max_length
 633
 634    return _widest()
 635
 636def _pyinstaller_traverse_dir(
 637        directory: str,
 638        ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
 639        include_dotfiles: bool = False
 640    ) -> list:
 641    """
 642    Recursively traverse a directory and return a list of its contents.
 643    """
 644    import os, pathlib
 645    paths = []
 646    _directory = pathlib.Path(directory)
 647
 648    def _found_pattern(name: str):
 649        for pattern in ignore_patterns:
 650            if pattern.replace('/', os.path.sep) in str(name):
 651                return True
 652        return False
 653
 654    for root, dirs, files in os.walk(_directory):
 655        _root = str(root)[len(str(_directory.parent)):]
 656        if _root.startswith(os.path.sep):
 657            _root = _root[len(os.path.sep):]
 658        if _root.startswith('.') and not include_dotfiles:
 659            continue
 660        ### ignore certain patterns
 661        if _found_pattern(_root):
 662            continue
 663
 664        for filename in files:
 665            if filename.startswith('.') and not include_dotfiles:
 666                continue
 667            path = os.path.join(root, filename)
 668            if _found_pattern(path):
 669                continue
 670
 671            _path = str(path)[len(str(_directory.parent)):]
 672            if _path.startswith(os.path.sep):
 673                _path = _path[len(os.path.sep):]
 674            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
 675
 676            paths.append((path, _path))
 677    return paths
 678
 679
 680def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
 681    """
 682    Recursively replace passwords in a dictionary.
 683
 684    Parameters
 685    ----------
 686    d: Dict[str, Any]
 687        The dictionary to search through.
 688
 689    replace_with: str, default '*'
 690        The string to replace each character of the password with.
 691
 692    Returns
 693    -------
 694    Another dictionary where values to the keys `'password'`
 695    are replaced with `replace_with` (`'*'`).
 696
 697    Examples
 698    --------
 699    >>> replace_password({'a': 1})
 700    {'a': 1}
 701    >>> replace_password({'password': '123'})
 702    {'password': '***'}
 703    >>> replace_password({'nested': {'password': '123'}})
 704    {'nested': {'password': '***'}}
 705    >>> replace_password({'password': '123'}, replace_with='!')
 706    {'password': '!!!'}
 707
 708    """
 709    import copy
 710    _d = copy.deepcopy(d)
 711    for k, v in d.items():
 712        if isinstance(v, dict):
 713            _d[k] = replace_password(v)
 714        elif 'password' in str(k).lower():
 715            _d[k] = ''.join([replace_with for char in str(v)])
 716        elif str(k).lower() == 'uri':
 717            from meerschaum.connectors.sql import SQLConnector
 718            try:
 719                uri_params = SQLConnector.parse_uri(v)
 720            except Exception as e:
 721                uri_params = None
 722            if not uri_params:
 723                continue
 724            if not 'username' in uri_params or not 'password' in uri_params:
 725                continue
 726            _d[k] = v.replace(
 727                uri_params['username'] + ':' + uri_params['password'],
 728                uri_params['username'] + ':' + ''.join(
 729                    [replace_with for char in str(uri_params['password'])]
 730                )
 731            )
 732    return _d
 733
 734
 735def filter_arguments(
 736    func: Callable[[Any], Any],
 737    *args: Any,
 738    **kwargs: Any
 739) -> Tuple[Tuple[Any], Dict[str, Any]]:
 740    """
 741    Filter out unsupported positional and keyword arguments.
 742
 743    Parameters
 744    ----------
 745    func: Callable[[Any], Any]
 746        The function to inspect.
 747
 748    *args: Any
 749        Positional arguments to filter and pass to `func`.
 750
 751    **kwargs
 752        Keyword arguments to filter and pass to `func`.
 753
 754    Returns
 755    -------
 756    The `args` and `kwargs` accepted by `func`.
 757    """
 758    args = filter_positionals(func, *args)
 759    kwargs = filter_keywords(func, **kwargs)
 760    return args, kwargs
 761
 762
 763def filter_keywords(
 764    func: Callable[[Any], Any],
 765    **kw: Any
 766) -> Dict[str, Any]:
 767    """
 768    Filter out unsupported keyword arguments.
 769
 770    Parameters
 771    ----------
 772    func: Callable[[Any], Any]
 773        The function to inspect.
 774
 775    **kw: Any
 776        The arguments to be filtered and passed into `func`.
 777
 778    Returns
 779    -------
 780    A dictionary of keyword arguments accepted by `func`.
 781
 782    Examples
 783    --------
 784    ```python
 785    >>> def foo(a=1, b=2):
 786    ...     return a * b
 787    >>> filter_keywords(foo, a=2, b=4, c=6)
 788    {'a': 2, 'b': 4}
 789    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
 790    8
 791    ```
 792
 793    """
 794    import inspect
 795    func_params = inspect.signature(func).parameters
 796    ### If the function has a **kw method, skip filtering.
 797    for param, _type in func_params.items():
 798        if '**' in str(_type):
 799            return kw
 800    return {k: v for k, v in kw.items() if k in func_params}
 801
 802
 803def filter_positionals(
 804    func: Callable[[Any], Any],
 805    *args: Any
 806) -> Tuple[Any]:
 807    """
 808    Filter out unsupported positional arguments.
 809
 810    Parameters
 811    ----------
 812    func: Callable[[Any], Any]
 813        The function to inspect.
 814
 815    *args: Any
 816        The arguments to be filtered and passed into `func`.
 817        NOTE: If the function signature expects more arguments than provided,
 818        the missing slots will be filled with `None`.
 819
 820    Returns
 821    -------
 822    A tuple of positional arguments accepted by `func`.
 823
 824    Examples
 825    --------
 826    ```python
 827    >>> def foo(a, b):
 828    ...     return a * b
 829    >>> filter_positionals(foo, 2, 4, 6)
 830    (2, 4)
 831    >>> foo(*filter_positionals(foo, 2, 4, 6))
 832    8
 833    ```
 834
 835    """
 836    import inspect
 837    from meerschaum.utils.warnings import warn
 838    func_params = inspect.signature(func).parameters
 839    acceptable_args: List[Any] = []
 840
 841    def _warn_invalids(_num_invalid):
 842        if _num_invalid > 0:
 843            warn(
 844                "Too few arguments were provided. "
 845                + f"{_num_invalid} argument"
 846                + ('s have ' if _num_invalid != 1 else " has ")
 847                + " been filled with `None`.",
 848            )
 849
 850    num_invalid: int = 0
 851    for i, (param, val) in enumerate(func_params.items()):
 852        if '=' in str(val) or '*' in str(val):
 853            _warn_invalids(num_invalid)
 854            return tuple(acceptable_args)
 855
 856        try:
 857            acceptable_args.append(args[i])
 858        except IndexError:
 859            acceptable_args.append(None)
 860            num_invalid += 1
 861
 862    _warn_invalids(num_invalid)
 863    return tuple(acceptable_args)
 864
 865
 866def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
 867    """
 868    Convert an ordered dict to a dict.
 869    Does not mutate the original OrderedDict.
 870    """
 871    from collections import OrderedDict
 872    _d = dict(od)
 873    for k, v in od.items():
 874        if isinstance(v, OrderedDict) or (
 875            issubclass(type(v), OrderedDict)
 876        ):
 877            _d[k] = dict_from_od(v)
 878    return _d
 879
 880def remove_ansi(s: str) -> str:
 881    """
 882    Remove ANSI escape characters from a string.
 883
 884    Parameters
 885    ----------
 886    s: str:
 887        The string to be cleaned.
 888
 889    Returns
 890    -------
 891    A string with the ANSI characters removed.
 892
 893    Examples
 894    --------
 895    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
 896    'Hello, World!'
 897
 898    """
 899    import re
 900    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
 901
 902
 903def get_connector_labels(
 904    *types: str,
 905    search_term: str = '',
 906    ignore_exact_match = True,
 907    _additional_options: Optional[List[str]] = None,
 908) -> List[str]:
 909    """
 910    Read connector labels from the configuration dictionary.
 911
 912    Parameters
 913    ----------
 914    *types: str
 915        The connector types.
 916        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
 917
 918    search_term: str, default ''
 919        A filter on the connectors' labels.
 920
 921    ignore_exact_match: bool, default True
 922        If `True`, skip a connector if the search_term is an exact match.
 923
 924    Returns
 925    -------
 926    A list of the keys of defined connectors.
 927
 928    """
 929    from meerschaum.config import get_config
 930    connectors = get_config('meerschaum', 'connectors')
 931
 932    _types = list(types)
 933    if len(_types) == 0:
 934        _types = list(connectors.keys()) + ['plugin']
 935
 936    conns = []
 937    for t in _types:
 938        if t == 'plugin':
 939            from meerschaum.plugins import get_data_plugins
 940            conns += [
 941                f'{t}:' + plugin.module.__name__.split('.')[-1]
 942                for plugin in get_data_plugins()
 943            ]
 944            continue
 945        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
 946
 947    if _additional_options:
 948        conns += _additional_options
 949
 950    possibilities = [
 951        c
 952        for c in conns
 953        if c.startswith(search_term)
 954            and c != (
 955                search_term if ignore_exact_match else ''
 956            )
 957    ]
 958    return sorted(possibilities)
 959
 960
 961def wget(
 962    url: str,
 963    dest: Optional[Union[str, 'pathlib.Path']] = None,
 964    headers: Optional[Dict[str, Any]] = None,
 965    color: bool = True,
 966    debug: bool = False,
 967    **kw: Any
 968) -> 'pathlib.Path':
 969    """
 970    Mimic `wget` with `requests`.
 971
 972    Parameters
 973    ----------
 974    url: str
 975        The URL to the resource to be downloaded.
 976
 977    dest: Optional[Union[str, pathlib.Path]], default None
 978        The destination path of the downloaded file.
 979        If `None`, save to the current directory.
 980
 981    color: bool, default True
 982        If `debug` is `True`, print color output.
 983
 984    debug: bool, default False
 985        Verbosity toggle.
 986
 987    Returns
 988    -------
 989    The path to the downloaded file.
 990
 991    """
 992    from meerschaum.utils.warnings import warn, error
 993    from meerschaum.utils.debug import dprint
 994    import os, pathlib, re, urllib.request
 995    if headers is None:
 996        headers = {}
 997    request = urllib.request.Request(url, headers=headers)
 998    if not color:
 999        dprint = print
1000    if debug:
1001        dprint(f"Downloading from '{url}'...")
1002    try:
1003        response = urllib.request.urlopen(request)
1004    except Exception as e:
1005        import ssl
1006        ssl._create_default_https_context = ssl._create_unverified_context
1007        try:
1008            response = urllib.request.urlopen(request)
1009        except Exception as _e:
1010            print(_e)
1011            response = None
1012    if response is None or response.code != 200:
1013        error_msg = f"Failed to download from '{url}'."
1014        if color:
1015            error(error_msg)
1016        else:
1017            print(error_msg)
1018            import sys
1019            sys.exit(1)
1020
1021    d = response.headers.get('content-disposition', None)
1022    fname = (
1023        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1024        else url.split('/')[-1]
1025    )
1026
1027    if dest is None:
1028        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1029    elif isinstance(dest, str):
1030        dest = pathlib.Path(dest)
1031
1032    with open(dest, 'wb') as f:
1033        f.write(response.fp.read())
1034
1035    if debug:
1036        dprint(f"Downloaded file '{dest}'.")
1037
1038    return dest
1039
1040
1041def async_wrap(func):
1042    """
1043    Run a synchronous function as async.
1044    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1045    """
1046    import asyncio
1047    from functools import wraps, partial
1048
1049    @wraps(func)
1050    async def run(*args, loop=None, executor=None, **kwargs):
1051        if loop is None:
1052            loop = asyncio.get_event_loop()
1053        pfunc = partial(func, *args, **kwargs)
1054        return await loop.run_in_executor(executor, pfunc)
1055    return run
1056
1057
1058def debug_trace(browser: bool = True):
1059    """
1060    Open a web-based debugger to trace the execution of the program.
1061
1062    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1063    """
1064    from meerschaum.utils.debug import trace
1065    trace(browser=browser)
1066
1067
1068def items_str(
1069    items: List[Any],
1070    quotes: bool = True,
1071    quote_str: str = "'",
1072    commas: bool = True,
1073    comma_str: str = ',',
1074    and_: bool = True,
1075    and_str: str = 'and',
1076    oxford_comma: bool = True,
1077    spaces: bool = True,
1078    space_str = ' ',
1079) -> str:
1080    """
1081    Return a formatted string if list items separated by commas.
1082
1083    Parameters
1084    ----------
1085    items: [List[Any]]
1086        The items to be printed as an English list.
1087
1088    quotes: bool, default True
1089        If `True`, wrap items in quotes.
1090
1091    quote_str: str, default "'"
1092        If `quotes` is `True`, prepend and append each item with this string.
1093
1094    and_: bool, default True
1095        If `True`, include the word 'and' before the final item in the list.
1096
1097    and_str: str, default 'and'
1098        If `and_` is True, insert this string where 'and' normally would in and English list.
1099
1100    oxford_comma: bool, default True
1101        If `True`, include the Oxford Comma (comma before the final 'and').
1102        Only applies when `and_` is `True`.
1103
1104    spaces: bool, default True
1105        If `True`, separate items with `space_str`
1106
1107    space_str: str, default ' '
1108        If `spaces` is `True`, separate items with this string.
1109
1110    Returns
1111    -------
1112    A string of the items as an English list.
1113
1114    Examples
1115    --------
1116    >>> items_str([1,2,3])
1117    "'1', '2', and '3'"
1118    >>> items_str([1,2,3], quotes=False)
1119    '1, 2, and 3'
1120    >>> items_str([1,2,3], and_=False)
1121    "'1', '2', '3'"
1122    >>> items_str([1,2,3], spaces=False, and_=False)
1123    "'1','2','3'"
1124    >>> items_str([1,2,3], oxford_comma=False)
1125    "'1', '2' and '3'"
1126    >>> items_str([1,2,3], quote_str=":")
1127    ':1:, :2:, and :3:'
1128    >>> items_str([1,2,3], and_str="or")
1129    "'1', '2', or '3'"
1130    >>> items_str([1,2,3], space_str="_")
1131    "'1',_'2',_and_'3'"
1132
1133    """
1134    if not items:
1135        return ''
1136    
1137    q = quote_str if quotes else ''
1138    s = space_str if spaces else ''
1139    a = and_str if and_ else ''
1140    c = comma_str if commas else ''
1141
1142    if len(items) == 1:
1143        return q + str(list(items)[0]) + q
1144
1145    if len(items) == 2:
1146        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1147
1148    sep = q + c + s + q
1149    output = q + sep.join(str(i) for i in items[:-1]) + q
1150    if oxford_comma:
1151        output += c
1152    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1153    return output
1154
1155
1156def interval_str(delta: Union[timedelta, int]) -> str:
1157    """
1158    Return a human-readable string for a `timedelta` (or `int` minutes).
1159
1160    Parameters
1161    ----------
1162    delta: Union[timedelta, int]
1163        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1164
1165    Returns
1166    -------
1167    A formatted string, fit for human eyes.
1168    """
1169    from meerschaum.utils.packages import attempt_import
1170    if is_int(delta):
1171        return str(delta)
1172    humanfriendly = attempt_import('humanfriendly')
1173    delta_seconds = (
1174        delta.total_seconds()
1175        if isinstance(delta, timedelta)
1176        else (delta * 60)
1177    )
1178    return humanfriendly.format_timespan(delta_seconds)
1179
1180
1181def is_docker_available() -> bool:
1182    """Check if we can connect to the Docker engine."""
1183    import subprocess
1184    try:
1185        has_docker = subprocess.call(
1186            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1187        ) == 0
1188    except Exception:
1189        has_docker = False
1190    return has_docker
1191
1192
1193def is_android() -> bool:
1194    """Return `True` if the current platform is Android."""
1195    import sys
1196    return hasattr(sys, 'getandroidapilevel')
1197
1198
1199def is_bcp_available() -> bool:
1200    """Check if the MSSQL `bcp` utility is installed."""
1201    import subprocess
1202
1203    try:
1204        has_bcp = subprocess.call(
1205            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1206        ) == 0
1207    except Exception:
1208        has_bcp = False
1209    return has_bcp
1210
1211
1212def is_systemd_available() -> bool:
1213    """Check if running on systemd."""
1214    import subprocess
1215    try:
1216        has_systemctl = subprocess.call(
1217            ['systemctl', '-h'],
1218            stdout=subprocess.DEVNULL,
1219            stderr=subprocess.STDOUT,
1220        ) == 0
1221    except Exception:
1222        has_systemctl = False
1223    return has_systemctl
1224
1225
1226def is_tmux_available() -> bool:
1227    """
1228    Check if `tmux` is installed.
1229    """
1230    import subprocess
1231    try:
1232        has_tmux = subprocess.call(
1233            ['tmux', '-V'],
1234            stdout=subprocess.DEVNULL,
1235            stderr=subprocess.STDOUT
1236        ) == 0
1237    except Exception:
1238        has_tmux = False
1239    return has_tmux
1240
1241def get_last_n_lines(file_name: str, N: int):
1242    """
1243    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1244    """
1245    import os
1246    # Create an empty list to keep the track of last N lines
1247    list_of_lines = []
1248    # Open file for reading in binary mode
1249    with open(file_name, 'rb') as read_obj:
1250        # Move the cursor to the end of the file
1251        read_obj.seek(0, os.SEEK_END)
1252        # Create a buffer to keep the last read line
1253        buffer = bytearray()
1254        # Get the current position of pointer i.e eof
1255        pointer_location = read_obj.tell()
1256        # Loop till pointer reaches the top of the file
1257        while pointer_location >= 0:
1258            # Move the file pointer to the location pointed by pointer_location
1259            read_obj.seek(pointer_location)
1260            # Shift pointer location by -1
1261            pointer_location = pointer_location -1
1262            # read that byte / character
1263            new_byte = read_obj.read(1)
1264            # If the read byte is new line character then it means one line is read
1265            if new_byte == b'\n':
1266                # Save the line in list of lines
1267                list_of_lines.append(buffer.decode()[::-1])
1268                # If the size of list reaches N, then return the reversed list
1269                if len(list_of_lines) == N:
1270                    return list(reversed(list_of_lines))
1271                # Reinitialize the byte array to save next line
1272                buffer = bytearray()
1273            else:
1274                # If last read character is not eol then add it in buffer
1275                buffer.extend(new_byte)
1276        # As file is read completely, if there is still data in buffer, then its first line.
1277        if len(buffer) > 0:
1278            list_of_lines.append(buffer.decode()[::-1])
1279    # return the reversed list
1280    return list(reversed(list_of_lines))
1281
1282
1283def tail(f, n, offset=None):
1284    """
1285    https://stackoverflow.com/a/692616/9699829
1286    
1287    Reads n lines from f with an offset of offset lines.  The return
1288    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1289    an indicator that is `True` if there are more lines in the file.
1290    """
1291    avg_line_length = 74
1292    to_read = n + (offset or 0)
1293
1294    while True:
1295        try:
1296            f.seek(-(avg_line_length * to_read), 2)
1297        except IOError:
1298            # woops.  apparently file is smaller than what we want
1299            # to step back, go to the beginning instead
1300            f.seek(0)
1301        pos = f.tell()
1302        lines = f.read().splitlines()
1303        if len(lines) >= to_read or pos == 0:
1304            return lines[-to_read:offset and -offset or None], \
1305                   len(lines) > to_read or pos > 0
1306        avg_line_length *= 1.3
1307
1308
1309def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1310    """
1311    Remove characters from each section of a string until the length is within the limit.
1312
1313    Parameters
1314    ----------
1315    item: str
1316        The item name to be truncated.
1317
1318    delimeter: str, default '_'
1319        Split `item` by this string into several sections.
1320
1321    max_len: int, default 128
1322        The max acceptable length of the truncated version of `item`.
1323
1324    Returns
1325    -------
1326    The truncated string.
1327
1328    Examples
1329    --------
1330    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1331    'ab_de_gh'
1332
1333    """
1334    if len(item) < max_len:
1335        return item
1336
1337    def _shorten(s: str) -> str:
1338        return s[:-1] if len(s) > 1 else s
1339
1340    sections = list(enumerate(item.split('_')))
1341    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1342    available_chars = max_len - len(sections)
1343
1344    _sections = [(i, s) for i, s in sorted_sections]
1345    _sections_len = sum([len(s) for i, s in _sections])
1346    _old_sections_len = _sections_len
1347    while _sections_len > available_chars:
1348        _sections = [(i, _shorten(s)) for i, s in _sections]
1349        _old_sections_len = _sections_len
1350        _sections_len = sum([len(s) for i, s in _sections])
1351        if _old_sections_len == _sections_len:
1352            raise Exception(f"String could not be truncated: '{item}'")
1353
1354    new_sections = sorted(_sections, key=lambda x: x[0])
1355    return delimeter.join([s for i, s in new_sections])
1356
1357
1358def separate_negation_values(
1359    vals: Union[List[str], Tuple[str]],
1360    negation_prefix: Optional[str] = None,
1361) -> Tuple[List[str], List[str]]:
1362    """
1363    Separate the negated values from the positive ones.
1364    Return two lists: positive and negative values.
1365
1366    Parameters
1367    ----------
1368    vals: Union[List[str], Tuple[str]]
1369        A list of strings to parse.
1370
1371    negation_prefix: Optional[str], default None
1372        Include values that start with this string in the second list.
1373        If `None`, use the system default (`_`).
1374    """
1375    if negation_prefix is None:
1376        from meerschaum.config.static import STATIC_CONFIG
1377        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1378    _in_vals, _ex_vals = [], []
1379    for v in vals:
1380        if str(v).startswith(negation_prefix):
1381            _ex_vals.append(str(v)[len(negation_prefix):])
1382        else:
1383            _in_vals.append(v)
1384
1385    return _in_vals, _ex_vals
1386
1387
1388def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1389    """
1390    Translate a params dictionary into lists of include- and exclude-values.
1391
1392    Parameters
1393    ----------
1394    params: Optional[Dict[str, Any]]
1395        A params query dictionary.
1396
1397    Returns
1398    -------
1399    A dictionary mapping keys to a tuple of lists for include and exclude values.
1400
1401    Examples
1402    --------
1403    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1404    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1405    """
1406    if not params:
1407        return {}
1408    return {
1409        col: separate_negation_values(
1410            (
1411                val
1412                if isinstance(val, (list, tuple))
1413                else [val]
1414            )
1415        )
1416        for col, val in params.items()
1417    }
1418
1419
1420def flatten_list(list_: List[Any]) -> List[Any]:
1421    """
1422    Recursively flatten a list.
1423    """
1424    for item in list_:
1425        if isinstance(item, list):
1426            yield from flatten_list(item)
1427        else:
1428            yield item
1429
1430
1431def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1432    """
1433    Parse a string containing the text to be passed into a function
1434    and return a tuple of args, kwargs.
1435
1436    Parameters
1437    ----------
1438    args_str: str
1439        The contents of the function parameter (as a string).
1440
1441    Returns
1442    -------
1443    A tuple of args (tuple) and kwargs (dict[str, Any]).
1444
1445    Examples
1446    --------
1447    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1448    (123, 456), {'foo': 789, 'bar': 'baz'}
1449    """
1450    import ast
1451    args = []
1452    kwargs = {}
1453
1454    for part in args_str.split(','):
1455        if '=' in part:
1456            key, val = part.split('=', 1)
1457            kwargs[key.strip()] = ast.literal_eval(val)
1458        else:
1459            args.append(ast.literal_eval(part.strip()))
1460
1461    return tuple(args), kwargs
1462
1463
1464def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple:
1465    """
1466    Wrap around `pathlib.Path.symlink_to`, but add support for Windows.
1467
1468    Parameters
1469    ----------
1470    src_path: pathlib.Path
1471        The source path.
1472
1473    dest_path: pathlib.Path
1474        The destination path.
1475
1476    Returns
1477    -------
1478    A SuccessTuple indicating success.
1479    """
1480    if dest_path.exists() and dest_path.resolve() == src_path.resolve():
1481        return True, "Symlink already exists."
1482    try:
1483        dest_path.symlink_to(src_path)
1484        success = True
1485    except Exception as e:
1486        success = False
1487        msg = str(e)
1488    if success:
1489        return success, "Success"
1490
1491    ### Failed to create a symlink.
1492    ### If we're not on Windows, return an error.
1493    import platform
1494    if platform.system() != 'Windows':
1495        return success, msg
1496
1497    try:
1498        import _winapi
1499    except ImportError:
1500        return False, "Unable to import _winapi."
1501
1502    if src_path.is_dir():
1503        try:
1504            _winapi.CreateJunction(str(src_path), str(dest_path))
1505        except Exception as e:
1506            return False, str(e)
1507        return True, "Success"
1508
1509    ### Last resort: copy the file on Windows.
1510    import shutil
1511    try:
1512        shutil.copy(src_path, dest_path)
1513    except Exception as e:
1514        return False, str(e)
1515
1516    return True, "Success"
1517
1518
1519def is_symlink(path: pathlib.Path) -> bool:
1520    """
1521    Wrap `path.is_symlink()` but add support for Windows junctions.
1522    """
1523    if path.is_symlink():
1524        return True
1525    import platform
1526    import os
1527    if platform.system() != 'Windows':
1528        return False
1529    try:
1530        return bool(os.readlink(path))
1531    except OSError:
1532        return False
1533
1534
1535def parametrized(dec):
1536    """
1537    A meta-decorator for allowing other decorator functions to have parameters.
1538
1539    https://stackoverflow.com/a/26151604/9699829
1540    """
1541    def layer(*args, **kwargs):
1542        def repl(f):
1543            return dec(f, *args, **kwargs)
1544        return repl
1545    return layer
1546
1547
1548def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1549    """
1550    Safely extract a TAR file to a give directory.
1551    This defends against CVE-2007-4559.
1552
1553    Parameters
1554    ----------
1555    tarf: file
1556        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1557
1558    output_dir: Union[str, pathlib.Path]
1559        The output directory.
1560    """
1561    import os
1562
1563    def is_within_directory(directory, target):
1564        abs_directory = os.path.abspath(directory)
1565        abs_target = os.path.abspath(target)
1566        prefix = os.path.commonprefix([abs_directory, abs_target])
1567        return prefix == abs_directory 
1568
1569    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1570        for member in tar.getmembers():
1571            member_path = os.path.join(path, member.name)
1572            if not is_within_directory(path, member_path):
1573                raise Exception("Attempted Path Traversal in Tar File")
1574
1575        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1576
1577    return safe_extract(tarf, output_dir)
1578
1579
1580##################
1581# Legacy imports #
1582##################
1583
1584def choose_subaction(*args, **kwargs) -> Any:
1585    """
1586    Placeholder function to prevent breaking legacy behavior.
1587    See `meerschaum.actions.choose_subaction`.
1588    """
1589    from meerschaum.actions import choose_subaction as _choose_subactions
1590    return _choose_subactions(*args, **kwargs)
1591
1592
1593def print_options(*args, **kwargs) -> None:
1594    """
1595    Placeholder function to prevent breaking legacy behavior.
1596    See `meerschaum.utils.formatting.print_options`.
1597    """
1598    from meerschaum.utils.formatting import print_options as _print_options
1599    return _print_options(*args, **kwargs)
1600
1601
1602def to_pandas_dtype(*args, **kwargs) -> Any:
1603    """
1604    Placeholder function to prevent breaking legacy behavior.
1605    See `meerschaum.utils.dtypes.to_pandas_dtype`.
1606    """
1607    from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype
1608    return _to_pandas_dtype(*args, **kwargs)
1609
1610
1611def filter_unseen_df(*args, **kwargs) -> Any:
1612    """
1613    Placeholder function to prevent breaking legacy behavior.
1614    See `meerschaum.utils.dataframe.filter_unseen_df`.
1615    """
1616    from meerschaum.utils.dataframe import filter_unseen_df as real_function
1617    return real_function(*args, **kwargs)
1618
1619
1620def add_missing_cols_to_df(*args, **kwargs) -> Any:
1621    """
1622    Placeholder function to prevent breaking legacy behavior.
1623    See `meerschaum.utils.dataframe.add_missing_cols_to_df`.
1624    """
1625    from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function
1626    return real_function(*args, **kwargs)
1627
1628
1629def parse_df_datetimes(*args, **kwargs) -> Any:
1630    """
1631    Placeholder function to prevent breaking legacy behavior.
1632    See `meerschaum.utils.dataframe.parse_df_datetimes`.
1633    """
1634    from meerschaum.utils.dataframe import parse_df_datetimes as real_function
1635    return real_function(*args, **kwargs)
1636
1637
1638def df_from_literal(*args, **kwargs) -> Any:
1639    """
1640    Placeholder function to prevent breaking legacy behavior.
1641    See `meerschaum.utils.dataframe.df_from_literal`.
1642    """
1643    from meerschaum.utils.dataframe import df_from_literal as real_function
1644    return real_function(*args, **kwargs)
1645
1646
1647def get_json_cols(*args, **kwargs) -> Any:
1648    """
1649    Placeholder function to prevent breaking legacy behavior.
1650    See `meerschaum.utils.dataframe.get_json_cols`.
1651    """
1652    from meerschaum.utils.dataframe import get_json_cols as real_function
1653    return real_function(*args, **kwargs)
1654
1655
1656def get_unhashable_cols(*args, **kwargs) -> Any:
1657    """
1658    Placeholder function to prevent breaking legacy behavior.
1659    See `meerschaum.utils.dataframe.get_unhashable_cols`.
1660    """
1661    from meerschaum.utils.dataframe import get_unhashable_cols as real_function
1662    return real_function(*args, **kwargs)
1663
1664
1665def enforce_dtypes(*args, **kwargs) -> Any:
1666    """
1667    Placeholder function to prevent breaking legacy behavior.
1668    See `meerschaum.utils.dataframe.enforce_dtypes`.
1669    """
1670    from meerschaum.utils.dataframe import enforce_dtypes as real_function
1671    return real_function(*args, **kwargs)
1672
1673
1674def get_datetime_bound_from_df(*args, **kwargs) -> Any:
1675    """
1676    Placeholder function to prevent breaking legacy behavior.
1677    See `meerschaum.utils.dataframe.get_datetime_bound_from_df`.
1678    """
1679    from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function
1680    return real_function(*args, **kwargs)
1681
1682
1683def df_is_chunk_generator(*args, **kwargs) -> Any:
1684    """
1685    Placeholder function to prevent breaking legacy behavior.
1686    See `meerschaum.utils.dataframe.df_is_chunk_generator`.
1687    """
1688    from meerschaum.utils.dataframe import df_is_chunk_generator as real_function
1689    return real_function(*args, **kwargs)
1690
1691
1692def choices_docstring(*args, **kwargs) -> Any:
1693    """
1694    Placeholder function to prevent breaking legacy behavior.
1695    See `meerschaum.actions.choices_docstring`.
1696    """
1697    from meerschaum.actions import choices_docstring as real_function
1698    return real_function(*args, **kwargs)
1699
1700
1701def _get_subaction_names(*args, **kwargs) -> Any:
1702    """
1703    Placeholder function to prevent breaking legacy behavior.
1704    See `meerschaum.actions._get_subaction_names`.
1705    """
1706    from meerschaum.actions import _get_subaction_names as real_function
1707    return real_function(*args, **kwargs)
1708
1709
1710def json_serialize_datetime(dt: datetime) -> Union[str, None]:
1711    """
1712    Serialize a datetime object into JSON (ISO format string).
1713
1714    Examples
1715    --------
1716    >>> import json
1717    >>> from datetime import datetime
1718    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
1719    '{"a": "2022-01-01T00:00:00Z"}'
1720
1721    """
1722    from meerschaum.utils.dtypes import serialize_datetime
1723    return serialize_datetime(dt)
1724
1725
1726_current_module = sys.modules[__name__]
1727__all__ = tuple(
1728    name
1729    for name, obj in globals().items()
1730    if callable(obj)
1731        and name not in __pdoc__
1732        and getattr(obj, '__module__', None) == _current_module.__name__
1733)
def add_method_to_class( func: Callable[[Any], Any], class_def: "'Class'", method_name: Optional[str] = None, keep_self: Optional[bool] = None) -> Callable[[Any], Any]:
49def add_method_to_class(
50        func: Callable[[Any], Any],
51        class_def: 'Class',
52        method_name: Optional[str] = None,
53        keep_self: Optional[bool] = None,
54    ) -> Callable[[Any], Any]:
55    """
56    Add function `func` to class `class_def`.
57
58    Parameters
59    ----------
60    func: Callable[[Any], Any]
61        Function to be added as a method of the class
62
63    class_def: Class
64        Class to be modified.
65
66    method_name: Optional[str], default None
67        New name of the method. None will use func.__name__ (default).
68
69    Returns
70    -------
71    The modified function object.
72
73    """
74    from functools import wraps
75
76    is_class = isinstance(class_def, type)
77    
78    @wraps(func)
79    def wrapper(self, *args, **kw):
80        return func(*args, **kw)
81
82    if method_name is None:
83        method_name = func.__name__
84
85    setattr(class_def, method_name, (
86            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
87        )
88    )
89
90    return func

Add function func to class class_def.

Parameters
  • func (Callable[[Any], Any]): Function to be added as a method of the class
  • class_def (Class): Class to be modified.
  • method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
  • The modified function object.
def generate_password(length: int = 12) -> str:
 93def generate_password(length: int = 12) -> str:
 94    """
 95    Generate a secure password of given length.
 96
 97    Parameters
 98    ----------
 99    length: int, default 12
100        The length of the password.
101
102    Returns
103    -------
104    A random password string.
105    """
106    import secrets
107    import string
108    return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))

Generate a secure password of given length.

Parameters
  • length (int, default 12): The length of the password.
Returns
  • A random password string.
def is_int(s: str) -> bool:
111def is_int(s: str) -> bool:
112    """
113    Check if string is an int.
114
115    Parameters
116    ----------
117    s: str
118        The string to be checked.
119        
120    Returns
121    -------
122    A bool indicating whether the string was able to be cast to an integer.
123
124    """
125    try:
126        float(s)
127    except Exception:
128        return False
129    
130    return float(s).is_integer()

Check if string is an int.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string was able to be cast to an integer.
def string_to_dict(params_string: str) -> Dict[str, Any]:
133def string_to_dict(
134    params_string: str
135) -> Dict[str, Any]:
136    """
137    Parse a string into a dictionary.
138    
139    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
140
141    Parameters
142    ----------
143    params_string: str
144        The string to be parsed.
145        
146    Returns
147    -------
148    The parsed dictionary.
149
150    Examples
151    --------
152    >>> string_to_dict("a:1,b:2") 
153    {'a': 1, 'b': 2}
154    >>> string_to_dict('{"a": 1, "b": 2}')
155    {'a': 1, 'b': 2}
156
157    """
158    if params_string == "":
159        return {}
160
161    import json
162
163    ### Kind of a weird edge case.
164    ### In the generated compose file, there is some weird escaping happening,
165    ### so the string to be parsed starts and ends with a single quote.
166    if (
167        isinstance(params_string, str)
168        and len(params_string) > 4
169        and params_string[1] == "{"
170        and params_string[-2] == "}"
171    ):
172        return json.loads(params_string[1:-1])
173    if str(params_string).startswith('{'):
174        return json.loads(params_string)
175
176    import ast
177    params_dict = {}
178    for param in params_string.split(","):
179        _keys = param.split(":")
180        keys = _keys[:-1]
181        try:
182            val = ast.literal_eval(_keys[-1])
183        except Exception:
184            val = str(_keys[-1])
185
186        c = params_dict
187        for _k in keys[:-1]:
188            try:
189                k = ast.literal_eval(_k)
190            except Exception:
191                k = str(_k)
192            if k not in c:
193                c[k] = {}
194            c = c[k]
195
196        c[keys[-1]] = val
197
198    return params_dict

Parse a string into a dictionary.

If the string begins with '{', parse as JSON. Otherwise use simple parsing.

Parameters
  • params_string (str): The string to be parsed.
Returns
  • The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2") 
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
def parse_config_substitution( value: str, leading_key: str = 'MRSM', begin_key: str = '{', end_key: str = '}', delimeter: str = ':') -> List[Any]:
201def parse_config_substitution(
202    value: str,
203    leading_key: str = 'MRSM',
204    begin_key: str = '{',
205    end_key: str = '}',
206    delimeter: str = ':'
207) -> List[Any]:
208    """
209    Parse Meerschaum substitution syntax
210    E.g. MRSM{value1:value2} => ['value1', 'value2']
211    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
212    """
213    if not value.beginswith(leading_key):
214        return value
215
216    return leading_key[len(leading_key):][len():-1].split(delimeter)

Parse Meerschaum substitution syntax E.g. MRSM{value1:value2} => ['value1', 'value2'] NOTE: Not currently used. See search_and_substitute_config in meerschaum.config._read_yaml.

def edit_file( path: Union[pathlib.Path, str], default_editor: str = 'pyvim', debug: bool = False) -> bool:
219def edit_file(
220    path: Union['pathlib.Path', str],
221    default_editor: str = 'pyvim',
222    debug: bool = False
223) -> bool:
224    """
225    Open a file for editing.
226
227    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
228
229    Parameters
230    ----------
231    path: Union[pathlib.Path, str]
232        The path to the file to be edited.
233
234    default_editor: str, default 'pyvim'
235        If `$EDITOR` is not set, use this instead.
236        If `pyvim` is not installed, it will install it from PyPI.
237
238    debug: bool, default False
239        Verbosity toggle.
240
241    Returns
242    -------
243    A bool indicating the file was successfully edited.
244    """
245    import os
246    from subprocess import call
247    from meerschaum.utils.debug import dprint
248    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
249    try:
250        EDITOR = os.environ.get('EDITOR', default_editor)
251        if debug:
252            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
253        rc = call([EDITOR, path])
254    except Exception as e: ### can't open with default editors
255        if debug:
256            dprint(str(e))
257            dprint('Failed to open file with system editor. Falling back to pyvim...')
258        pyvim = attempt_import('pyvim', lazy=False)
259        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
260    return rc == 0

Open a file for editing.

Attempt to launch the user's defined $EDITOR, otherwise use pyvim.

Parameters
  • path (Union[pathlib.Path, str]): The path to the file to be edited.
  • default_editor (str, default 'pyvim'): If $EDITOR is not set, use this instead. If pyvim is not installed, it will install it from PyPI.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating the file was successfully edited.
def is_pipe_registered( pipe: meerschaum.Pipe, pipes: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]], debug: bool = False) -> bool:
263def is_pipe_registered(
264    pipe: mrsm.Pipe,
265    pipes: PipesDict,
266    debug: bool = False
267) -> bool:
268    """
269    Check if a Pipe is inside the pipes dictionary.
270
271    Parameters
272    ----------
273    pipe: meerschaum.Pipe
274        The pipe to see if it's in the dictionary.
275
276    pipes: PipesDict
277        The dictionary to search inside.
278
279    debug: bool, default False
280        Verbosity toggle.
281
282    Returns
283    -------
284    A bool indicating whether the pipe is inside the dictionary.
285    """
286    from meerschaum.utils.debug import dprint
287    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
288    if debug:
289        dprint(f'{ck}, {mk}, {lk}')
290        dprint(f'{pipe}, {pipes}')
291    return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]

Check if a Pipe is inside the pipes dictionary.

Parameters
  • pipe (meerschaum.Pipe): The pipe to see if it's in the dictionary.
  • pipes (PipesDict): The dictionary to search inside.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating whether the pipe is inside the dictionary.
def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
294def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
295    """
296    Determine the columns and lines in the terminal.
297    If they cannot be determined, return the default values (100 columns and 120 lines).
298
299    Parameters
300    ----------
301    default_cols: int, default 100
302        If the columns cannot be determined, return this value.
303
304    default_lines: int, default 120
305        If the lines cannot be determined, return this value.
306
307    Returns
308    -------
309    A tuple if integers for the columns and lines.
310    """
311    import os
312    try:
313        size = os.get_terminal_size()
314        _cols, _lines = size.columns, size.lines
315    except Exception as e:
316        _cols, _lines = (
317            int(os.environ.get('COLUMNS', str(default_cols))),
318            int(os.environ.get('LINES', str(default_lines))),
319        )
320    return _cols, _lines

Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).

Parameters
  • default_cols (int, default 100): If the columns cannot be determined, return this value.
  • default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
  • A tuple if integers for the columns and lines.
def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
323def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
324    """
325    Iterate over a list in chunks.
326    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
327
328    Parameters
329    ----------
330    iterable: Iterable[Any]
331        The iterable to iterate over in chunks.
332        
333    chunksize: int
334        The size of chunks to iterate with.
335        
336    fillvalue: Optional[Any], default None
337        If the chunks do not evenly divide into the iterable, pad the end with this value.
338
339    Returns
340    -------
341    A generator of tuples of size `chunksize`.
342
343    """
344    from itertools import zip_longest
345    args = [iter(iterable)] * chunksize
346    return zip_longest(*args, fillvalue=fillvalue)

Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks

Parameters
  • iterable (Iterable[Any]): The iterable to iterate over in chunks.
  • chunksize (int): The size of chunks to iterate with.
  • fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
  • A generator of tuples of size chunksize.
def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
348def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
349    """
350    Sort a dictionary's values and return a new dictionary.
351
352    Parameters
353    ----------
354    d: Dict[Any, Any]
355        The dictionary to be sorted.
356
357    Returns
358    -------
359    A sorted dictionary.
360
361    Examples
362    --------
363    >>> sorted_dict({'b': 1, 'a': 2})
364    {'b': 1, 'a': 2}
365    >>> sorted_dict({'b': 2, 'a': 1})
366    {'a': 1, 'b': 2}
367
368    """
369    try:
370        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
371    except Exception as e:
372        return d

Sort a dictionary's values and return a new dictionary.

Parameters
  • d (Dict[Any, Any]): The dictionary to be sorted.
Returns
  • A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
def flatten_pipes_dict( pipes_dict: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]) -> 'List[Pipe]':
374def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
375    """
376    Convert the standard pipes dictionary into a list.
377
378    Parameters
379    ----------
380    pipes_dict: PipesDict
381        The pipes dictionary to be flattened.
382
383    Returns
384    -------
385    A list of `Pipe` objects.
386
387    """
388    pipes_list = []
389    for ck in pipes_dict.values():
390        for mk in ck.values():
391            pipes_list += list(mk.values())
392    return pipes_list

Convert the standard pipes dictionary into a list.

Parameters
  • pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
  • A list of Pipe objects.
def round_time( dt: Optional[datetime.datetime] = None, date_delta: Optional[datetime.timedelta] = None, to: str = 'down') -> datetime.datetime:
395def round_time(
396    dt: Optional[datetime] = None,
397    date_delta: Optional[timedelta] = None,
398    to: 'str' = 'down'
399) -> datetime:
400    """
401    Round a datetime object to a multiple of a timedelta.
402    http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
403
404    NOTE: This function strips timezone information!
405
406    Parameters
407    ----------
408    dt: Optional[datetime], default None
409        If `None`, grab the current UTC datetime.
410
411    date_delta: Optional[timedelta], default None
412        If `None`, use a delta of 1 minute.
413
414    to: 'str', default 'down'
415        Available options are `'up'`, `'down'`, and `'closest'`.
416
417    Returns
418    -------
419    A rounded `datetime` object.
420
421    Examples
422    --------
423    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
424    datetime.datetime(2022, 1, 1, 12, 15)
425    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
426    datetime.datetime(2022, 1, 1, 12, 16)
427    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
428    datetime.datetime(2022, 1, 1, 12, 0)
429    >>> round_time(
430    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
431    ...   timedelta(hours=1),
432    ...   to = 'closest'
433    ... )
434    datetime.datetime(2022, 1, 1, 12, 0)
435    >>> round_time(
436    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
437    ...   datetime.timedelta(hours=1),
438    ...   to = 'closest'
439    ... )
440    datetime.datetime(2022, 1, 1, 13, 0)
441
442    """
443    if date_delta is None:
444        date_delta = timedelta(minutes=1)
445    round_to = date_delta.total_seconds()
446    if dt is None:
447        dt = datetime.now(timezone.utc).replace(tzinfo=None)
448    seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds
449
450    if seconds % round_to == 0 and dt.microsecond == 0:
451        rounding = (seconds + round_to / 2) // round_to * round_to
452    else:
453        if to == 'up':
454            rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to
455        elif to == 'down':
456            rounding = seconds // round_to * round_to
457        else:
458            rounding = (seconds + round_to / 2) // round_to * round_to
459
460    return dt + timedelta(0, rounding - seconds, - dt.microsecond)

Round a datetime object to a multiple of a timedelta. http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python

NOTE: This function strips timezone information!

Parameters
  • dt (Optional[datetime], default None): If None, grab the current UTC datetime.
  • date_delta (Optional[timedelta], default None): If None, use a delta of 1 minute.
  • to ('str', default 'down'): Available options are 'up', 'down', and 'closest'.
Returns
  • A rounded datetime object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 15, 57, 200),
...   timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 45, 57, 200),
...   datetime.timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)
def timed_input( seconds: int = 10, timeout_message: str = '', prompt: str = '', icon: bool = False, **kw) -> Optional[str]:
463def timed_input(
464        seconds: int = 10,
465        timeout_message: str = "",
466        prompt: str = "",
467        icon: bool = False,
468        **kw
469    ) -> Union[str, None]:
470    """
471    Accept user input only for a brief period of time.
472
473    Parameters
474    ----------
475    seconds: int, default 10
476        The number of seconds to wait.
477
478    timeout_message: str, default ''
479        The message to print after the window has elapsed.
480
481    prompt: str, default ''
482        The prompt to print during the window.
483
484    icon: bool, default False
485        If `True`, print the configured input icon.
486
487
488    Returns
489    -------
490    The input string entered by the user.
491
492    """
493    import signal, time
494
495    class TimeoutExpired(Exception):
496        """Raise this exception when the timeout is reached."""
497
498    def alarm_handler(signum, frame):
499        raise TimeoutExpired
500
501    # set signal handler
502    signal.signal(signal.SIGALRM, alarm_handler)
503    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
504
505    try:
506        return input(prompt)
507    except TimeoutExpired:
508        return None
509    except (EOFError, RuntimeError):
510        try:
511            print(prompt)
512            time.sleep(seconds)
513        except TimeoutExpired:
514            return None
515    finally:
516        signal.alarm(0) # cancel alarm

Accept user input only for a brief period of time.

Parameters
  • seconds (int, default 10): The number of seconds to wait.
  • timeout_message (str, default ''): The message to print after the window has elapsed.
  • prompt (str, default ''): The prompt to print during the window.
  • icon (bool, default False): If True, print the configured input icon.
Returns
  • The input string entered by the user.
def replace_pipes_in_dict( pipes: Optional[Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]] = None, func: "'function'" = <class 'str'>, debug: bool = False, **kw) -> Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]:
522def replace_pipes_in_dict(
523        pipes : Optional[PipesDict] = None,
524        func: 'function' = str,
525        debug: bool = False,
526        **kw
527    ) -> PipesDict:
528    """
529    Replace the Pipes in a Pipes dict with the result of another function.
530
531    Parameters
532    ----------
533    pipes: Optional[PipesDict], default None
534        The pipes dict to be processed.
535
536    func: Callable[[Any], Any], default str
537        The function to be applied to every pipe.
538        Defaults to the string constructor.
539
540    debug: bool, default False
541        Verbosity toggle.
542    
543
544    Returns
545    -------
546    A dictionary where every pipe is replaced with the output of a function.
547
548    """
549    import copy
550    def change_dict(d : Dict[Any, Any], func : 'function') -> None:
551        for k, v in d.items():
552            if isinstance(v, dict):
553                change_dict(v, func)
554            else:
555                d[k] = func(v)
556
557    if pipes is None:
558        from meerschaum import get_pipes
559        pipes = get_pipes(debug=debug, **kw)
560
561    result = copy.deepcopy(pipes)
562    change_dict(result, func)
563    return result

Replace the Pipes in a Pipes dict with the result of another function.

Parameters
  • pipes (Optional[PipesDict], default None): The pipes dict to be processed.
  • func (Callable[[Any], Any], default str): The function to be applied to every pipe. Defaults to the string constructor.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A dictionary where every pipe is replaced with the output of a function.
def enforce_gevent_monkey_patch():
565def enforce_gevent_monkey_patch():
566    """
567    Check if gevent monkey patching is enabled, and if not, then apply patching.
568    """
569    from meerschaum.utils.packages import attempt_import
570    import socket
571    gevent, gevent_socket, gevent_monkey = attempt_import(
572        'gevent', 'gevent.socket', 'gevent.monkey'
573    )
574    if not socket.socket is gevent_socket.socket:
575        gevent_monkey.patch_all()

Check if gevent monkey patching is enabled, and if not, then apply patching.

def is_valid_email(email: str) -> Optional[re.Match]:
577def is_valid_email(email: str) -> Union['re.Match', None]:
578    """
579    Check whether a string is a valid email.
580
581    Parameters
582    ----------
583    email: str
584        The string to be examined.
585        
586    Returns
587    -------
588    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
589
590    Examples
591    --------
592    >>> is_valid_email('foo')
593    >>> is_valid_email('foo@foo.com')
594    <re.Match object; span=(0, 11), match='foo@foo.com'>
595
596    """
597    import re
598    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
599    return re.search(regex, email)

Check whether a string is a valid email.

Parameters
  • email (str): The string to be examined.
Returns
  • None if a string is not in email format, otherwise a re.Match object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
def string_width(string: str, widest: bool = True) -> int:
602def string_width(string: str, widest: bool = True) -> int:
603    """
604    Calculate the width of a string, either by its widest or last line.
605
606    Parameters
607    ----------
608    string: str:
609        The string to be examined.
610        
611    widest: bool, default True
612        No longer used because `widest` is always assumed to be true.
613
614    Returns
615    -------
616    An integer for the text's visual width.
617
618    Examples
619    --------
620    >>> string_width('a')
621    1
622    >>> string_width('a\\nbc\\nd')
623    2
624
625    """
626    def _widest():
627        words = string.split('\n')
628        max_length = 0
629        for w in words:
630            length = len(w)
631            if length > max_length:
632                max_length = length
633        return max_length
634
635    return _widest()

Calculate the width of a string, either by its widest or last line.

Parameters
  • string (str:): The string to be examined.
  • widest (bool, default True): No longer used because widest is always assumed to be true.
Returns
  • An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
def _pyinstaller_traverse_dir( directory: str, ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), include_dotfiles: bool = False) -> list:
637def _pyinstaller_traverse_dir(
638        directory: str,
639        ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
640        include_dotfiles: bool = False
641    ) -> list:
642    """
643    Recursively traverse a directory and return a list of its contents.
644    """
645    import os, pathlib
646    paths = []
647    _directory = pathlib.Path(directory)
648
649    def _found_pattern(name: str):
650        for pattern in ignore_patterns:
651            if pattern.replace('/', os.path.sep) in str(name):
652                return True
653        return False
654
655    for root, dirs, files in os.walk(_directory):
656        _root = str(root)[len(str(_directory.parent)):]
657        if _root.startswith(os.path.sep):
658            _root = _root[len(os.path.sep):]
659        if _root.startswith('.') and not include_dotfiles:
660            continue
661        ### ignore certain patterns
662        if _found_pattern(_root):
663            continue
664
665        for filename in files:
666            if filename.startswith('.') and not include_dotfiles:
667                continue
668            path = os.path.join(root, filename)
669            if _found_pattern(path):
670                continue
671
672            _path = str(path)[len(str(_directory.parent)):]
673            if _path.startswith(os.path.sep):
674                _path = _path[len(os.path.sep):]
675            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
676
677            paths.append((path, _path))
678    return paths

Recursively traverse a directory and return a list of its contents.

def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
681def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
682    """
683    Recursively replace passwords in a dictionary.
684
685    Parameters
686    ----------
687    d: Dict[str, Any]
688        The dictionary to search through.
689
690    replace_with: str, default '*'
691        The string to replace each character of the password with.
692
693    Returns
694    -------
695    Another dictionary where values to the keys `'password'`
696    are replaced with `replace_with` (`'*'`).
697
698    Examples
699    --------
700    >>> replace_password({'a': 1})
701    {'a': 1}
702    >>> replace_password({'password': '123'})
703    {'password': '***'}
704    >>> replace_password({'nested': {'password': '123'}})
705    {'nested': {'password': '***'}}
706    >>> replace_password({'password': '123'}, replace_with='!')
707    {'password': '!!!'}
708
709    """
710    import copy
711    _d = copy.deepcopy(d)
712    for k, v in d.items():
713        if isinstance(v, dict):
714            _d[k] = replace_password(v)
715        elif 'password' in str(k).lower():
716            _d[k] = ''.join([replace_with for char in str(v)])
717        elif str(k).lower() == 'uri':
718            from meerschaum.connectors.sql import SQLConnector
719            try:
720                uri_params = SQLConnector.parse_uri(v)
721            except Exception as e:
722                uri_params = None
723            if not uri_params:
724                continue
725            if not 'username' in uri_params or not 'password' in uri_params:
726                continue
727            _d[k] = v.replace(
728                uri_params['username'] + ':' + uri_params['password'],
729                uri_params['username'] + ':' + ''.join(
730                    [replace_with for char in str(uri_params['password'])]
731                )
732            )
733    return _d

Recursively replace passwords in a dictionary.

Parameters
  • d (Dict[str, Any]): The dictionary to search through.
  • replace_with (str, default '*'): The string to replace each character of the password with.
Returns
  • Another dictionary where values to the keys 'password'
  • are replaced with replace_with ('*').
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
def filter_arguments( func: Callable[[Any], Any], *args: Any, **kwargs: Any) -> Tuple[Tuple[Any], Dict[str, Any]]:
736def filter_arguments(
737    func: Callable[[Any], Any],
738    *args: Any,
739    **kwargs: Any
740) -> Tuple[Tuple[Any], Dict[str, Any]]:
741    """
742    Filter out unsupported positional and keyword arguments.
743
744    Parameters
745    ----------
746    func: Callable[[Any], Any]
747        The function to inspect.
748
749    *args: Any
750        Positional arguments to filter and pass to `func`.
751
752    **kwargs
753        Keyword arguments to filter and pass to `func`.
754
755    Returns
756    -------
757    The `args` and `kwargs` accepted by `func`.
758    """
759    args = filter_positionals(func, *args)
760    kwargs = filter_keywords(func, **kwargs)
761    return args, kwargs

Filter out unsupported positional and keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): Positional arguments to filter and pass to func.
  • **kwargs: Keyword arguments to filter and pass to func.
Returns
  • The args and kwargs accepted by func.
def filter_keywords(func: Callable[[Any], Any], **kw: Any) -> Dict[str, Any]:
764def filter_keywords(
765    func: Callable[[Any], Any],
766    **kw: Any
767) -> Dict[str, Any]:
768    """
769    Filter out unsupported keyword arguments.
770
771    Parameters
772    ----------
773    func: Callable[[Any], Any]
774        The function to inspect.
775
776    **kw: Any
777        The arguments to be filtered and passed into `func`.
778
779    Returns
780    -------
781    A dictionary of keyword arguments accepted by `func`.
782
783    Examples
784    --------
785    ```python
786    >>> def foo(a=1, b=2):
787    ...     return a * b
788    >>> filter_keywords(foo, a=2, b=4, c=6)
789    {'a': 2, 'b': 4}
790    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
791    8
792    ```
793
794    """
795    import inspect
796    func_params = inspect.signature(func).parameters
797    ### If the function has a **kw method, skip filtering.
798    for param, _type in func_params.items():
799        if '**' in str(_type):
800            return kw
801    return {k: v for k, v in kw.items() if k in func_params}

Filter out unsupported keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • **kw (Any): The arguments to be filtered and passed into func.
Returns
  • A dictionary of keyword arguments accepted by func.
Examples
>>> def foo(a=1, b=2):
...     return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
def filter_positionals(func: Callable[[Any], Any], *args: Any) -> Tuple[Any]:
804def filter_positionals(
805    func: Callable[[Any], Any],
806    *args: Any
807) -> Tuple[Any]:
808    """
809    Filter out unsupported positional arguments.
810
811    Parameters
812    ----------
813    func: Callable[[Any], Any]
814        The function to inspect.
815
816    *args: Any
817        The arguments to be filtered and passed into `func`.
818        NOTE: If the function signature expects more arguments than provided,
819        the missing slots will be filled with `None`.
820
821    Returns
822    -------
823    A tuple of positional arguments accepted by `func`.
824
825    Examples
826    --------
827    ```python
828    >>> def foo(a, b):
829    ...     return a * b
830    >>> filter_positionals(foo, 2, 4, 6)
831    (2, 4)
832    >>> foo(*filter_positionals(foo, 2, 4, 6))
833    8
834    ```
835
836    """
837    import inspect
838    from meerschaum.utils.warnings import warn
839    func_params = inspect.signature(func).parameters
840    acceptable_args: List[Any] = []
841
842    def _warn_invalids(_num_invalid):
843        if _num_invalid > 0:
844            warn(
845                "Too few arguments were provided. "
846                + f"{_num_invalid} argument"
847                + ('s have ' if _num_invalid != 1 else " has ")
848                + " been filled with `None`.",
849            )
850
851    num_invalid: int = 0
852    for i, (param, val) in enumerate(func_params.items()):
853        if '=' in str(val) or '*' in str(val):
854            _warn_invalids(num_invalid)
855            return tuple(acceptable_args)
856
857        try:
858            acceptable_args.append(args[i])
859        except IndexError:
860            acceptable_args.append(None)
861            num_invalid += 1
862
863    _warn_invalids(num_invalid)
864    return tuple(acceptable_args)

Filter out unsupported positional arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): The arguments to be filtered and passed into func. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled with None.
Returns
  • A tuple of positional arguments accepted by func.
Examples
>>> def foo(a, b):
...     return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
867def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
868    """
869    Convert an ordered dict to a dict.
870    Does not mutate the original OrderedDict.
871    """
872    from collections import OrderedDict
873    _d = dict(od)
874    for k, v in od.items():
875        if isinstance(v, OrderedDict) or (
876            issubclass(type(v), OrderedDict)
877        ):
878            _d[k] = dict_from_od(v)
879    return _d

Convert an ordered dict to a dict. Does not mutate the original OrderedDict.

def remove_ansi(s: str) -> str:
881def remove_ansi(s: str) -> str:
882    """
883    Remove ANSI escape characters from a string.
884
885    Parameters
886    ----------
887    s: str:
888        The string to be cleaned.
889
890    Returns
891    -------
892    A string with the ANSI characters removed.
893
894    Examples
895    --------
896    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
897    'Hello, World!'
898
899    """
900    import re
901    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)

Remove ANSI escape characters from a string.

Parameters
  • s (str:): The string to be cleaned.
Returns
  • A string with the ANSI characters removed.
Examples
>>> remove_ansi("Hello, World!")
'Hello, World!'
def get_connector_labels( *types: str, search_term: str = '', ignore_exact_match=True, _additional_options: Optional[List[str]] = None) -> List[str]:
904def get_connector_labels(
905    *types: str,
906    search_term: str = '',
907    ignore_exact_match = True,
908    _additional_options: Optional[List[str]] = None,
909) -> List[str]:
910    """
911    Read connector labels from the configuration dictionary.
912
913    Parameters
914    ----------
915    *types: str
916        The connector types.
917        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
918
919    search_term: str, default ''
920        A filter on the connectors' labels.
921
922    ignore_exact_match: bool, default True
923        If `True`, skip a connector if the search_term is an exact match.
924
925    Returns
926    -------
927    A list of the keys of defined connectors.
928
929    """
930    from meerschaum.config import get_config
931    connectors = get_config('meerschaum', 'connectors')
932
933    _types = list(types)
934    if len(_types) == 0:
935        _types = list(connectors.keys()) + ['plugin']
936
937    conns = []
938    for t in _types:
939        if t == 'plugin':
940            from meerschaum.plugins import get_data_plugins
941            conns += [
942                f'{t}:' + plugin.module.__name__.split('.')[-1]
943                for plugin in get_data_plugins()
944            ]
945            continue
946        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
947
948    if _additional_options:
949        conns += _additional_options
950
951    possibilities = [
952        c
953        for c in conns
954        if c.startswith(search_term)
955            and c != (
956                search_term if ignore_exact_match else ''
957            )
958    ]
959    return sorted(possibilities)

Read connector labels from the configuration dictionary.

Parameters
  • *types (str): The connector types. If none are provided, use the defined types ('sql' and 'api') and 'plugin'.
  • search_term (str, default ''): A filter on the connectors' labels.
  • ignore_exact_match (bool, default True): If True, skip a connector if the search_term is an exact match.
Returns
  • A list of the keys of defined connectors.
def wget( url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, color: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
 962def wget(
 963    url: str,
 964    dest: Optional[Union[str, 'pathlib.Path']] = None,
 965    headers: Optional[Dict[str, Any]] = None,
 966    color: bool = True,
 967    debug: bool = False,
 968    **kw: Any
 969) -> 'pathlib.Path':
 970    """
 971    Mimic `wget` with `requests`.
 972
 973    Parameters
 974    ----------
 975    url: str
 976        The URL to the resource to be downloaded.
 977
 978    dest: Optional[Union[str, pathlib.Path]], default None
 979        The destination path of the downloaded file.
 980        If `None`, save to the current directory.
 981
 982    color: bool, default True
 983        If `debug` is `True`, print color output.
 984
 985    debug: bool, default False
 986        Verbosity toggle.
 987
 988    Returns
 989    -------
 990    The path to the downloaded file.
 991
 992    """
 993    from meerschaum.utils.warnings import warn, error
 994    from meerschaum.utils.debug import dprint
 995    import os, pathlib, re, urllib.request
 996    if headers is None:
 997        headers = {}
 998    request = urllib.request.Request(url, headers=headers)
 999    if not color:
1000        dprint = print
1001    if debug:
1002        dprint(f"Downloading from '{url}'...")
1003    try:
1004        response = urllib.request.urlopen(request)
1005    except Exception as e:
1006        import ssl
1007        ssl._create_default_https_context = ssl._create_unverified_context
1008        try:
1009            response = urllib.request.urlopen(request)
1010        except Exception as _e:
1011            print(_e)
1012            response = None
1013    if response is None or response.code != 200:
1014        error_msg = f"Failed to download from '{url}'."
1015        if color:
1016            error(error_msg)
1017        else:
1018            print(error_msg)
1019            import sys
1020            sys.exit(1)
1021
1022    d = response.headers.get('content-disposition', None)
1023    fname = (
1024        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1025        else url.split('/')[-1]
1026    )
1027
1028    if dest is None:
1029        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1030    elif isinstance(dest, str):
1031        dest = pathlib.Path(dest)
1032
1033    with open(dest, 'wb') as f:
1034        f.write(response.fp.read())
1035
1036    if debug:
1037        dprint(f"Downloaded file '{dest}'.")
1038
1039    return dest

Mimic wget with requests.

Parameters
  • url (str): The URL to the resource to be downloaded.
  • dest (Optional[Union[str, pathlib.Path]], default None): The destination path of the downloaded file. If None, save to the current directory.
  • color (bool, default True): If debug is True, print color output.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The path to the downloaded file.
def async_wrap(func):
1042def async_wrap(func):
1043    """
1044    Run a synchronous function as async.
1045    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1046    """
1047    import asyncio
1048    from functools import wraps, partial
1049
1050    @wraps(func)
1051    async def run(*args, loop=None, executor=None, **kwargs):
1052        if loop is None:
1053            loop = asyncio.get_event_loop()
1054        pfunc = partial(func, *args, **kwargs)
1055        return await loop.run_in_executor(executor, pfunc)
1056    return run
def debug_trace(browser: bool = True):
1059def debug_trace(browser: bool = True):
1060    """
1061    Open a web-based debugger to trace the execution of the program.
1062
1063    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1064    """
1065    from meerschaum.utils.debug import trace
1066    trace(browser=browser)

Open a web-based debugger to trace the execution of the program.

This is an alias import for meerschaum.utils.debug.debug_trace.

def items_str( items: List[Any], quotes: bool = True, quote_str: str = "'", commas: bool = True, comma_str: str = ',', and_: bool = True, and_str: str = 'and', oxford_comma: bool = True, spaces: bool = True, space_str=' ') -> str:
1069def items_str(
1070    items: List[Any],
1071    quotes: bool = True,
1072    quote_str: str = "'",
1073    commas: bool = True,
1074    comma_str: str = ',',
1075    and_: bool = True,
1076    and_str: str = 'and',
1077    oxford_comma: bool = True,
1078    spaces: bool = True,
1079    space_str = ' ',
1080) -> str:
1081    """
1082    Return a formatted string if list items separated by commas.
1083
1084    Parameters
1085    ----------
1086    items: [List[Any]]
1087        The items to be printed as an English list.
1088
1089    quotes: bool, default True
1090        If `True`, wrap items in quotes.
1091
1092    quote_str: str, default "'"
1093        If `quotes` is `True`, prepend and append each item with this string.
1094
1095    and_: bool, default True
1096        If `True`, include the word 'and' before the final item in the list.
1097
1098    and_str: str, default 'and'
1099        If `and_` is True, insert this string where 'and' normally would in and English list.
1100
1101    oxford_comma: bool, default True
1102        If `True`, include the Oxford Comma (comma before the final 'and').
1103        Only applies when `and_` is `True`.
1104
1105    spaces: bool, default True
1106        If `True`, separate items with `space_str`
1107
1108    space_str: str, default ' '
1109        If `spaces` is `True`, separate items with this string.
1110
1111    Returns
1112    -------
1113    A string of the items as an English list.
1114
1115    Examples
1116    --------
1117    >>> items_str([1,2,3])
1118    "'1', '2', and '3'"
1119    >>> items_str([1,2,3], quotes=False)
1120    '1, 2, and 3'
1121    >>> items_str([1,2,3], and_=False)
1122    "'1', '2', '3'"
1123    >>> items_str([1,2,3], spaces=False, and_=False)
1124    "'1','2','3'"
1125    >>> items_str([1,2,3], oxford_comma=False)
1126    "'1', '2' and '3'"
1127    >>> items_str([1,2,3], quote_str=":")
1128    ':1:, :2:, and :3:'
1129    >>> items_str([1,2,3], and_str="or")
1130    "'1', '2', or '3'"
1131    >>> items_str([1,2,3], space_str="_")
1132    "'1',_'2',_and_'3'"
1133
1134    """
1135    if not items:
1136        return ''
1137    
1138    q = quote_str if quotes else ''
1139    s = space_str if spaces else ''
1140    a = and_str if and_ else ''
1141    c = comma_str if commas else ''
1142
1143    if len(items) == 1:
1144        return q + str(list(items)[0]) + q
1145
1146    if len(items) == 2:
1147        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1148
1149    sep = q + c + s + q
1150    output = q + sep.join(str(i) for i in items[:-1]) + q
1151    if oxford_comma:
1152        output += c
1153    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1154    return output

Return a formatted string if list items separated by commas.

Parameters
  • items ([List[Any]]): The items to be printed as an English list.
  • quotes (bool, default True): If True, wrap items in quotes.
  • quote_str (str, default "'"): If quotes is True, prepend and append each item with this string.
  • and_ (bool, default True): If True, include the word 'and' before the final item in the list.
  • and_str (str, default 'and'): If and_ is True, insert this string where 'and' normally would in and English list.
  • oxford_comma (bool, default True): If True, include the Oxford Comma (comma before the final 'and'). Only applies when and_ is True.
  • spaces (bool, default True): If True, separate items with space_str
  • space_str (str, default ' '): If spaces is True, separate items with this string.
Returns
  • A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
def interval_str(delta: Union[datetime.timedelta, int]) -> str:
1157def interval_str(delta: Union[timedelta, int]) -> str:
1158    """
1159    Return a human-readable string for a `timedelta` (or `int` minutes).
1160
1161    Parameters
1162    ----------
1163    delta: Union[timedelta, int]
1164        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1165
1166    Returns
1167    -------
1168    A formatted string, fit for human eyes.
1169    """
1170    from meerschaum.utils.packages import attempt_import
1171    if is_int(delta):
1172        return str(delta)
1173    humanfriendly = attempt_import('humanfriendly')
1174    delta_seconds = (
1175        delta.total_seconds()
1176        if isinstance(delta, timedelta)
1177        else (delta * 60)
1178    )
1179    return humanfriendly.format_timespan(delta_seconds)

Return a human-readable string for a timedelta (or int minutes).

Parameters
  • delta (Union[timedelta, int]): The interval to print. If delta is an integer, assume it corresponds to minutes.
Returns
  • A formatted string, fit for human eyes.
def is_docker_available() -> bool:
1182def is_docker_available() -> bool:
1183    """Check if we can connect to the Docker engine."""
1184    import subprocess
1185    try:
1186        has_docker = subprocess.call(
1187            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1188        ) == 0
1189    except Exception:
1190        has_docker = False
1191    return has_docker

Check if we can connect to the Docker engine.

def is_android() -> bool:
1194def is_android() -> bool:
1195    """Return `True` if the current platform is Android."""
1196    import sys
1197    return hasattr(sys, 'getandroidapilevel')

Return True if the current platform is Android.

def is_bcp_available() -> bool:
1200def is_bcp_available() -> bool:
1201    """Check if the MSSQL `bcp` utility is installed."""
1202    import subprocess
1203
1204    try:
1205        has_bcp = subprocess.call(
1206            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1207        ) == 0
1208    except Exception:
1209        has_bcp = False
1210    return has_bcp

Check if the MSSQL bcp utility is installed.

def is_systemd_available() -> bool:
1213def is_systemd_available() -> bool:
1214    """Check if running on systemd."""
1215    import subprocess
1216    try:
1217        has_systemctl = subprocess.call(
1218            ['systemctl', '-h'],
1219            stdout=subprocess.DEVNULL,
1220            stderr=subprocess.STDOUT,
1221        ) == 0
1222    except Exception:
1223        has_systemctl = False
1224    return has_systemctl

Check if running on systemd.

def is_tmux_available() -> bool:
1227def is_tmux_available() -> bool:
1228    """
1229    Check if `tmux` is installed.
1230    """
1231    import subprocess
1232    try:
1233        has_tmux = subprocess.call(
1234            ['tmux', '-V'],
1235            stdout=subprocess.DEVNULL,
1236            stderr=subprocess.STDOUT
1237        ) == 0
1238    except Exception:
1239        has_tmux = False
1240    return has_tmux

Check if tmux is installed.

def get_last_n_lines(file_name: str, N: int):
1242def get_last_n_lines(file_name: str, N: int):
1243    """
1244    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1245    """
1246    import os
1247    # Create an empty list to keep the track of last N lines
1248    list_of_lines = []
1249    # Open file for reading in binary mode
1250    with open(file_name, 'rb') as read_obj:
1251        # Move the cursor to the end of the file
1252        read_obj.seek(0, os.SEEK_END)
1253        # Create a buffer to keep the last read line
1254        buffer = bytearray()
1255        # Get the current position of pointer i.e eof
1256        pointer_location = read_obj.tell()
1257        # Loop till pointer reaches the top of the file
1258        while pointer_location >= 0:
1259            # Move the file pointer to the location pointed by pointer_location
1260            read_obj.seek(pointer_location)
1261            # Shift pointer location by -1
1262            pointer_location = pointer_location -1
1263            # read that byte / character
1264            new_byte = read_obj.read(1)
1265            # If the read byte is new line character then it means one line is read
1266            if new_byte == b'\n':
1267                # Save the line in list of lines
1268                list_of_lines.append(buffer.decode()[::-1])
1269                # If the size of list reaches N, then return the reversed list
1270                if len(list_of_lines) == N:
1271                    return list(reversed(list_of_lines))
1272                # Reinitialize the byte array to save next line
1273                buffer = bytearray()
1274            else:
1275                # If last read character is not eol then add it in buffer
1276                buffer.extend(new_byte)
1277        # As file is read completely, if there is still data in buffer, then its first line.
1278        if len(buffer) > 0:
1279            list_of_lines.append(buffer.decode()[::-1])
1280    # return the reversed list
1281    return list(reversed(list_of_lines))
def tail(f, n, offset=None):
1284def tail(f, n, offset=None):
1285    """
1286    https://stackoverflow.com/a/692616/9699829
1287    
1288    Reads n lines from f with an offset of offset lines.  The return
1289    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1290    an indicator that is `True` if there are more lines in the file.
1291    """
1292    avg_line_length = 74
1293    to_read = n + (offset or 0)
1294
1295    while True:
1296        try:
1297            f.seek(-(avg_line_length * to_read), 2)
1298        except IOError:
1299            # woops.  apparently file is smaller than what we want
1300            # to step back, go to the beginning instead
1301            f.seek(0)
1302        pos = f.tell()
1303        lines = f.read().splitlines()
1304        if len(lines) >= to_read or pos == 0:
1305            return lines[-to_read:offset and -offset or None], \
1306                   len(lines) > to_read or pos > 0
1307        avg_line_length *= 1.3

https://stackoverflow.com/a/692616/9699829

Reads n lines from f with an offset of offset lines. The return value is a tuple in the form (lines, has_more) where has_more is an indicator that is True if there are more lines in the file.

def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1310def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1311    """
1312    Remove characters from each section of a string until the length is within the limit.
1313
1314    Parameters
1315    ----------
1316    item: str
1317        The item name to be truncated.
1318
1319    delimeter: str, default '_'
1320        Split `item` by this string into several sections.
1321
1322    max_len: int, default 128
1323        The max acceptable length of the truncated version of `item`.
1324
1325    Returns
1326    -------
1327    The truncated string.
1328
1329    Examples
1330    --------
1331    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1332    'ab_de_gh'
1333
1334    """
1335    if len(item) < max_len:
1336        return item
1337
1338    def _shorten(s: str) -> str:
1339        return s[:-1] if len(s) > 1 else s
1340
1341    sections = list(enumerate(item.split('_')))
1342    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1343    available_chars = max_len - len(sections)
1344
1345    _sections = [(i, s) for i, s in sorted_sections]
1346    _sections_len = sum([len(s) for i, s in _sections])
1347    _old_sections_len = _sections_len
1348    while _sections_len > available_chars:
1349        _sections = [(i, _shorten(s)) for i, s in _sections]
1350        _old_sections_len = _sections_len
1351        _sections_len = sum([len(s) for i, s in _sections])
1352        if _old_sections_len == _sections_len:
1353            raise Exception(f"String could not be truncated: '{item}'")
1354
1355    new_sections = sorted(_sections, key=lambda x: x[0])
1356    return delimeter.join([s for i, s in new_sections])

Remove characters from each section of a string until the length is within the limit.

Parameters
  • item (str): The item name to be truncated.
  • delimeter (str, default '_'): Split item by this string into several sections.
  • max_len (int, default 128): The max acceptable length of the truncated version of item.
Returns
  • The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
def separate_negation_values( vals: Union[List[str], Tuple[str]], negation_prefix: Optional[str] = None) -> Tuple[List[str], List[str]]:
1359def separate_negation_values(
1360    vals: Union[List[str], Tuple[str]],
1361    negation_prefix: Optional[str] = None,
1362) -> Tuple[List[str], List[str]]:
1363    """
1364    Separate the negated values from the positive ones.
1365    Return two lists: positive and negative values.
1366
1367    Parameters
1368    ----------
1369    vals: Union[List[str], Tuple[str]]
1370        A list of strings to parse.
1371
1372    negation_prefix: Optional[str], default None
1373        Include values that start with this string in the second list.
1374        If `None`, use the system default (`_`).
1375    """
1376    if negation_prefix is None:
1377        from meerschaum.config.static import STATIC_CONFIG
1378        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1379    _in_vals, _ex_vals = [], []
1380    for v in vals:
1381        if str(v).startswith(negation_prefix):
1382            _ex_vals.append(str(v)[len(negation_prefix):])
1383        else:
1384            _in_vals.append(v)
1385
1386    return _in_vals, _ex_vals

Separate the negated values from the positive ones. Return two lists: positive and negative values.

Parameters
  • vals (Union[List[str], Tuple[str]]): A list of strings to parse.
  • negation_prefix (Optional[str], default None): Include values that start with this string in the second list. If None, use the system default (_).
def get_in_ex_params( params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1389def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1390    """
1391    Translate a params dictionary into lists of include- and exclude-values.
1392
1393    Parameters
1394    ----------
1395    params: Optional[Dict[str, Any]]
1396        A params query dictionary.
1397
1398    Returns
1399    -------
1400    A dictionary mapping keys to a tuple of lists for include and exclude values.
1401
1402    Examples
1403    --------
1404    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1405    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1406    """
1407    if not params:
1408        return {}
1409    return {
1410        col: separate_negation_values(
1411            (
1412                val
1413                if isinstance(val, (list, tuple))
1414                else [val]
1415            )
1416        )
1417        for col, val in params.items()
1418    }

Translate a params dictionary into lists of include- and exclude-values.

Parameters
  • params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
  • A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
def flatten_list(list_: List[Any]) -> List[Any]:
1421def flatten_list(list_: List[Any]) -> List[Any]:
1422    """
1423    Recursively flatten a list.
1424    """
1425    for item in list_:
1426        if isinstance(item, list):
1427            yield from flatten_list(item)
1428        else:
1429            yield item

Recursively flatten a list.

def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1432def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1433    """
1434    Parse a string containing the text to be passed into a function
1435    and return a tuple of args, kwargs.
1436
1437    Parameters
1438    ----------
1439    args_str: str
1440        The contents of the function parameter (as a string).
1441
1442    Returns
1443    -------
1444    A tuple of args (tuple) and kwargs (dict[str, Any]).
1445
1446    Examples
1447    --------
1448    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1449    (123, 456), {'foo': 789, 'bar': 'baz'}
1450    """
1451    import ast
1452    args = []
1453    kwargs = {}
1454
1455    for part in args_str.split(','):
1456        if '=' in part:
1457            key, val = part.split('=', 1)
1458            kwargs[key.strip()] = ast.literal_eval(val)
1459        else:
1460            args.append(ast.literal_eval(part.strip()))
1461
1462    return tuple(args), kwargs

Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.

Parameters
  • args_str (str): The contents of the function parameter (as a string).
Returns
  • A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
def parametrized(dec):
1536def parametrized(dec):
1537    """
1538    A meta-decorator for allowing other decorator functions to have parameters.
1539
1540    https://stackoverflow.com/a/26151604/9699829
1541    """
1542    def layer(*args, **kwargs):
1543        def repl(f):
1544            return dec(f, *args, **kwargs)
1545        return repl
1546    return layer

A meta-decorator for allowing other decorator functions to have parameters.

https://stackoverflow.com/a/26151604/9699829

def safely_extract_tar(tarf: "'file'", output_dir: Union[str, pathlib.Path]) -> None:
1549def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1550    """
1551    Safely extract a TAR file to a give directory.
1552    This defends against CVE-2007-4559.
1553
1554    Parameters
1555    ----------
1556    tarf: file
1557        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1558
1559    output_dir: Union[str, pathlib.Path]
1560        The output directory.
1561    """
1562    import os
1563
1564    def is_within_directory(directory, target):
1565        abs_directory = os.path.abspath(directory)
1566        abs_target = os.path.abspath(target)
1567        prefix = os.path.commonprefix([abs_directory, abs_target])
1568        return prefix == abs_directory 
1569
1570    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1571        for member in tar.getmembers():
1572            member_path = os.path.join(path, member.name)
1573            if not is_within_directory(path, member_path):
1574                raise Exception("Attempted Path Traversal in Tar File")
1575
1576        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1577
1578    return safe_extract(tarf, output_dir)

Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.

Parameters
  • tarf (file): The TAR file opened with tarfile.open(path, 'r:gz').
  • output_dir (Union[str, pathlib.Path]): The output directory.
def choose_subaction(*args, **kwargs) -> Any:
1585def choose_subaction(*args, **kwargs) -> Any:
1586    """
1587    Placeholder function to prevent breaking legacy behavior.
1588    See `meerschaum.actions.choose_subaction`.
1589    """
1590    from meerschaum.actions import choose_subaction as _choose_subactions
1591    return _choose_subactions(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.actions.choose_subaction.

def json_serialize_datetime(dt: datetime.datetime) -> Optional[str]:
1711def json_serialize_datetime(dt: datetime) -> Union[str, None]:
1712    """
1713    Serialize a datetime object into JSON (ISO format string).
1714
1715    Examples
1716    --------
1717    >>> import json
1718    >>> from datetime import datetime
1719    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
1720    '{"a": "2022-01-01T00:00:00Z"}'
1721
1722    """
1723    from meerschaum.utils.dtypes import serialize_datetime
1724    return serialize_datetime(dt)

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'