meerschaum.utils.misc

Miscellaneous functions go here

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4"""
   5Miscellaneous functions go here
   6"""
   7
   8from __future__ import annotations
   9import sys
  10from datetime import timedelta, datetime, timezone
  11from meerschaum.utils.typing import (
  12    Union,
  13    Any,
  14    Callable,
  15    Optional,
  16    List,
  17    Dict,
  18    SuccessTuple,
  19    Iterable,
  20    PipesDict,
  21    Tuple,
  22    InstanceConnector,
  23    Hashable,
  24    Generator,
  25    Iterator,
  26    TYPE_CHECKING,
  27)
  28import meerschaum as mrsm
  29if TYPE_CHECKING:
  30    import collections
  31
  32__pdoc__: Dict[str, bool] = {
  33    'to_pandas_dtype': False,
  34    'filter_unseen_df': False,
  35    'add_missing_cols_to_df': False,
  36    'parse_df_datetimes': False,
  37    'df_from_literal': False,
  38    'get_json_cols': False,
  39    'get_unhashable_cols': False,
  40    'enforce_dtypes': False,
  41    'get_datetime_bound_from_df': False,
  42    'df_is_chunk_generator': False,
  43    'choices_docstring': False,
  44    '_get_subaction_names': False,
  45}
  46
  47
  48def add_method_to_class(
  49        func: Callable[[Any], Any],
  50        class_def: 'Class',
  51        method_name: Optional[str] = None,
  52        keep_self: Optional[bool] = None,
  53    ) -> Callable[[Any], Any]:
  54    """
  55    Add function `func` to class `class_def`.
  56
  57    Parameters
  58    ----------
  59    func: Callable[[Any], Any]
  60        Function to be added as a method of the class
  61
  62    class_def: Class
  63        Class to be modified.
  64
  65    method_name: Optional[str], default None
  66        New name of the method. None will use func.__name__ (default).
  67
  68    Returns
  69    -------
  70    The modified function object.
  71
  72    """
  73    from functools import wraps
  74
  75    is_class = isinstance(class_def, type)
  76    
  77    @wraps(func)
  78    def wrapper(self, *args, **kw):
  79        return func(*args, **kw)
  80
  81    if method_name is None:
  82        method_name = func.__name__
  83
  84    setattr(class_def, method_name, (
  85            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
  86        )
  87    )
  88
  89    return func
  90
  91
  92def generate_password(length: int = 12) -> str:
  93    """Generate a secure password of given length.
  94
  95    Parameters
  96    ----------
  97    length : int, default 12
  98        The length of the password.
  99
 100    Returns
 101    -------
 102    A random password string.
 103
 104    """
 105    import secrets, string
 106    return ''.join((secrets.choice(string.ascii_letters) for i in range(length)))
 107
 108def is_int(s : str) -> bool:
 109    """
 110    Check if string is an int.
 111
 112    Parameters
 113    ----------
 114    s: str
 115        The string to be checked.
 116        
 117    Returns
 118    -------
 119    A bool indicating whether the string was able to be cast to an integer.
 120
 121    """
 122    try:
 123        float(s)
 124    except Exception as e:
 125        return False
 126    
 127    return float(s).is_integer()
 128
 129
 130def string_to_dict(
 131        params_string: str
 132    ) -> Dict[str, Any]:
 133    """
 134    Parse a string into a dictionary.
 135    
 136    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
 137
 138    Parameters
 139    ----------
 140    params_string: str
 141        The string to be parsed.
 142        
 143    Returns
 144    -------
 145    The parsed dictionary.
 146
 147    Examples
 148    --------
 149    >>> string_to_dict("a:1,b:2") 
 150    {'a': 1, 'b': 2}
 151    >>> string_to_dict('{"a": 1, "b": 2}')
 152    {'a': 1, 'b': 2}
 153
 154    """
 155    if params_string == "":
 156        return {}
 157
 158    import json
 159
 160    ### Kind of a weird edge case.
 161    ### In the generated compose file, there is some weird escaping happening,
 162    ### so the string to be parsed starts and ends with a single quote.
 163    if (
 164        isinstance(params_string, str)
 165        and len(params_string) > 4
 166        and params_string[1] == "{"
 167        and params_string[-2] == "}"
 168    ):
 169        return json.loads(params_string[1:-1])
 170    if str(params_string).startswith('{'):
 171        return json.loads(params_string)
 172
 173    import ast
 174    params_dict = {}
 175    for param in params_string.split(","):
 176        _keys = param.split(":")
 177        keys = _keys[:-1]
 178        try:
 179            val = ast.literal_eval(_keys[-1])
 180        except Exception as e:
 181            val = str(_keys[-1])
 182
 183        c = params_dict
 184        for _k in keys[:-1]:
 185            try:
 186                k = ast.literal_eval(_k)
 187            except Exception as e:
 188                k = str(_k)
 189            if k not in c:
 190                c[k] = {}
 191            c = c[k]
 192
 193        c[keys[-1]] = val
 194
 195    return params_dict
 196
 197
 198def parse_config_substitution(
 199        value: str,
 200        leading_key: str = 'MRSM',
 201        begin_key: str = '{',
 202        end_key: str = '}',
 203        delimeter: str = ':'
 204    ) -> List[Any]:
 205    """
 206    Parse Meerschaum substitution syntax
 207    E.g. MRSM{value1:value2} => ['value1', 'value2']
 208    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
 209    """
 210    if not value.beginswith(leading_key):
 211        return value
 212
 213    return leading_key[len(leading_key):][len():-1].split(delimeter)
 214
 215
 216def edit_file(
 217    path: Union['pathlib.Path', str],
 218    default_editor: str = 'pyvim',
 219    debug: bool = False
 220) -> bool:
 221    """
 222    Open a file for editing.
 223
 224    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
 225
 226    Parameters
 227    ----------
 228    path: Union[pathlib.Path, str]
 229        The path to the file to be edited.
 230
 231    default_editor: str, default 'pyvim'
 232        If `$EDITOR` is not set, use this instead.
 233        If `pyvim` is not installed, it will install it from PyPI.
 234
 235    debug: bool, default False
 236        Verbosity toggle.
 237
 238    Returns
 239    -------
 240    A bool indicating the file was successfully edited.
 241    """
 242    import os
 243    from subprocess import call
 244    from meerschaum.utils.debug import dprint
 245    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
 246    try:
 247        EDITOR = os.environ.get('EDITOR', default_editor)
 248        if debug:
 249            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
 250        rc = call([EDITOR, path])
 251    except Exception as e: ### can't open with default editors
 252        if debug:
 253            dprint(str(e))
 254            dprint('Failed to open file with system editor. Falling back to pyvim...')
 255        pyvim = attempt_import('pyvim', lazy=False)
 256        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
 257    return rc == 0
 258
 259
 260def is_pipe_registered(
 261    pipe: mrsm.Pipe,
 262    pipes: PipesDict,
 263    debug: bool = False
 264) -> bool:
 265    """
 266    Check if a Pipe is inside the pipes dictionary.
 267
 268    Parameters
 269    ----------
 270    pipe: meerschaum.Pipe
 271        The pipe to see if it's in the dictionary.
 272
 273    pipes: PipesDict
 274        The dictionary to search inside.
 275
 276    debug: bool, default False
 277        Verbosity toggle.
 278
 279    Returns
 280    -------
 281    A bool indicating whether the pipe is inside the dictionary.
 282    """
 283    from meerschaum.utils.debug import dprint
 284    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
 285    if debug:
 286        dprint(f'{ck}, {mk}, {lk}')
 287        dprint(f'{pipe}, {pipes}')
 288    return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]
 289
 290
 291def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
 292    """
 293    Determine the columns and lines in the terminal.
 294    If they cannot be determined, return the default values (100 columns and 120 lines).
 295
 296    Parameters
 297    ----------
 298    default_cols: int, default 100
 299        If the columns cannot be determined, return this value.
 300
 301    default_lines: int, default 120
 302        If the lines cannot be determined, return this value.
 303
 304    Returns
 305    -------
 306    A tuple if integers for the columns and lines.
 307    """
 308    import os
 309    try:
 310        size = os.get_terminal_size()
 311        _cols, _lines = size.columns, size.lines
 312    except Exception as e:
 313        _cols, _lines = (
 314            int(os.environ.get('COLUMNS', str(default_cols))),
 315            int(os.environ.get('LINES', str(default_lines))),
 316        )
 317    return _cols, _lines
 318
 319
 320def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
 321    """
 322    Iterate over a list in chunks.
 323    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
 324
 325    Parameters
 326    ----------
 327    iterable: Iterable[Any]
 328        The iterable to iterate over in chunks.
 329        
 330    chunksize: int
 331        The size of chunks to iterate with.
 332        
 333    fillvalue: Optional[Any], default None
 334        If the chunks do not evenly divide into the iterable, pad the end with this value.
 335
 336    Returns
 337    -------
 338    A generator of tuples of size `chunksize`.
 339
 340    """
 341    from itertools import zip_longest
 342    args = [iter(iterable)] * chunksize
 343    return zip_longest(*args, fillvalue=fillvalue)
 344
 345def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
 346    """
 347    Sort a dictionary's values and return a new dictionary.
 348
 349    Parameters
 350    ----------
 351    d: Dict[Any, Any]
 352        The dictionary to be sorted.
 353
 354    Returns
 355    -------
 356    A sorted dictionary.
 357
 358    Examples
 359    --------
 360    >>> sorted_dict({'b': 1, 'a': 2})
 361    {'b': 1, 'a': 2}
 362    >>> sorted_dict({'b': 2, 'a': 1})
 363    {'a': 1, 'b': 2}
 364
 365    """
 366    try:
 367        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
 368    except Exception as e:
 369        return d
 370
 371def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
 372    """
 373    Convert the standard pipes dictionary into a list.
 374
 375    Parameters
 376    ----------
 377    pipes_dict: PipesDict
 378        The pipes dictionary to be flattened.
 379
 380    Returns
 381    -------
 382    A list of `Pipe` objects.
 383
 384    """
 385    pipes_list = []
 386    for ck in pipes_dict.values():
 387        for mk in ck.values():
 388            pipes_list += list(mk.values())
 389    return pipes_list
 390
 391
 392def round_time(
 393    dt: Optional[datetime] = None,
 394    date_delta: Optional[timedelta] = None,
 395    to: 'str' = 'down'
 396) -> datetime:
 397    """
 398    Round a datetime object to a multiple of a timedelta.
 399    http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
 400
 401    NOTE: This function strips timezone information!
 402
 403    Parameters
 404    ----------
 405    dt: Optional[datetime], default None
 406        If `None`, grab the current UTC datetime.
 407
 408    date_delta: Optional[timedelta], default None
 409        If `None`, use a delta of 1 minute.
 410
 411    to: 'str', default 'down'
 412        Available options are `'up'`, `'down'`, and `'closest'`.
 413
 414    Returns
 415    -------
 416    A rounded `datetime` object.
 417
 418    Examples
 419    --------
 420    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
 421    datetime.datetime(2022, 1, 1, 12, 15)
 422    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
 423    datetime.datetime(2022, 1, 1, 12, 16)
 424    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
 425    datetime.datetime(2022, 1, 1, 12, 0)
 426    >>> round_time(
 427    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
 428    ...   timedelta(hours=1),
 429    ...   to = 'closest'
 430    ... )
 431    datetime.datetime(2022, 1, 1, 12, 0)
 432    >>> round_time(
 433    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
 434    ...   datetime.timedelta(hours=1),
 435    ...   to = 'closest'
 436    ... )
 437    datetime.datetime(2022, 1, 1, 13, 0)
 438
 439    """
 440    if date_delta is None:
 441        date_delta = timedelta(minutes=1)
 442    round_to = date_delta.total_seconds()
 443    if dt is None:
 444        dt = datetime.now(timezone.utc).replace(tzinfo=None)
 445    seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds
 446
 447    if seconds % round_to == 0 and dt.microsecond == 0:
 448        rounding = (seconds + round_to / 2) // round_to * round_to
 449    else:
 450        if to == 'up':
 451            rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to
 452        elif to == 'down':
 453            rounding = seconds // round_to * round_to
 454        else:
 455            rounding = (seconds + round_to / 2) // round_to * round_to
 456
 457    return dt + timedelta(0, rounding - seconds, - dt.microsecond)
 458
 459
 460def timed_input(
 461        seconds: int = 10,
 462        timeout_message: str = "",
 463        prompt: str = "",
 464        icon: bool = False,
 465        **kw
 466    ) -> Union[str, None]:
 467    """
 468    Accept user input only for a brief period of time.
 469
 470    Parameters
 471    ----------
 472    seconds: int, default 10
 473        The number of seconds to wait.
 474
 475    timeout_message: str, default ''
 476        The message to print after the window has elapsed.
 477
 478    prompt: str, default ''
 479        The prompt to print during the window.
 480
 481    icon: bool, default False
 482        If `True`, print the configured input icon.
 483
 484
 485    Returns
 486    -------
 487    The input string entered by the user.
 488
 489    """
 490    import signal, time
 491
 492    class TimeoutExpired(Exception):
 493        """Raise this exception when the timeout is reached."""
 494
 495    def alarm_handler(signum, frame):
 496        raise TimeoutExpired
 497
 498    # set signal handler
 499    signal.signal(signal.SIGALRM, alarm_handler)
 500    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
 501
 502    try:
 503        return input(prompt)
 504    except TimeoutExpired:
 505        return None
 506    except (EOFError, RuntimeError):
 507        try:
 508            print(prompt)
 509            time.sleep(seconds)
 510        except TimeoutExpired:
 511            return None
 512    finally:
 513        signal.alarm(0) # cancel alarm
 514
 515
 516
 517
 518
 519def replace_pipes_in_dict(
 520        pipes : Optional[PipesDict] = None,
 521        func: 'function' = str,
 522        debug: bool = False,
 523        **kw
 524    ) -> PipesDict:
 525    """
 526    Replace the Pipes in a Pipes dict with the result of another function.
 527
 528    Parameters
 529    ----------
 530    pipes: Optional[PipesDict], default None
 531        The pipes dict to be processed.
 532
 533    func: Callable[[Any], Any], default str
 534        The function to be applied to every pipe.
 535        Defaults to the string constructor.
 536
 537    debug: bool, default False
 538        Verbosity toggle.
 539    
 540
 541    Returns
 542    -------
 543    A dictionary where every pipe is replaced with the output of a function.
 544
 545    """
 546    import copy
 547    def change_dict(d : Dict[Any, Any], func : 'function') -> None:
 548        for k, v in d.items():
 549            if isinstance(v, dict):
 550                change_dict(v, func)
 551            else:
 552                d[k] = func(v)
 553
 554    if pipes is None:
 555        from meerschaum import get_pipes
 556        pipes = get_pipes(debug=debug, **kw)
 557
 558    result = copy.deepcopy(pipes)
 559    change_dict(result, func)
 560    return result
 561
 562def enforce_gevent_monkey_patch():
 563    """
 564    Check if gevent monkey patching is enabled, and if not, then apply patching.
 565    """
 566    from meerschaum.utils.packages import attempt_import
 567    import socket
 568    gevent, gevent_socket, gevent_monkey = attempt_import(
 569        'gevent', 'gevent.socket', 'gevent.monkey'
 570    )
 571    if not socket.socket is gevent_socket.socket:
 572        gevent_monkey.patch_all()
 573
 574def is_valid_email(email: str) -> Union['re.Match', None]:
 575    """
 576    Check whether a string is a valid email.
 577
 578    Parameters
 579    ----------
 580    email: str
 581        The string to be examined.
 582        
 583    Returns
 584    -------
 585    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
 586
 587    Examples
 588    --------
 589    >>> is_valid_email('foo')
 590    >>> is_valid_email('foo@foo.com')
 591    <re.Match object; span=(0, 11), match='foo@foo.com'>
 592
 593    """
 594    import re
 595    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
 596    return re.search(regex, email)
 597
 598
 599def string_width(string: str, widest: bool = True) -> int:
 600    """
 601    Calculate the width of a string, either by its widest or last line.
 602
 603    Parameters
 604    ----------
 605    string: str:
 606        The string to be examined.
 607        
 608    widest: bool, default True
 609        No longer used because `widest` is always assumed to be true.
 610
 611    Returns
 612    -------
 613    An integer for the text's visual width.
 614
 615    Examples
 616    --------
 617    >>> string_width('a')
 618    1
 619    >>> string_width('a\\nbc\\nd')
 620    2
 621
 622    """
 623    def _widest():
 624        words = string.split('\n')
 625        max_length = 0
 626        for w in words:
 627            length = len(w)
 628            if length > max_length:
 629                max_length = length
 630        return max_length
 631
 632    return _widest()
 633
 634def _pyinstaller_traverse_dir(
 635        directory: str,
 636        ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
 637        include_dotfiles: bool = False
 638    ) -> list:
 639    """
 640    Recursively traverse a directory and return a list of its contents.
 641    """
 642    import os, pathlib
 643    paths = []
 644    _directory = pathlib.Path(directory)
 645
 646    def _found_pattern(name: str):
 647        for pattern in ignore_patterns:
 648            if pattern.replace('/', os.path.sep) in str(name):
 649                return True
 650        return False
 651
 652    for root, dirs, files in os.walk(_directory):
 653        _root = str(root)[len(str(_directory.parent)):]
 654        if _root.startswith(os.path.sep):
 655            _root = _root[len(os.path.sep):]
 656        if _root.startswith('.') and not include_dotfiles:
 657            continue
 658        ### ignore certain patterns
 659        if _found_pattern(_root):
 660            continue
 661
 662        for filename in files:
 663            if filename.startswith('.') and not include_dotfiles:
 664                continue
 665            path = os.path.join(root, filename)
 666            if _found_pattern(path):
 667                continue
 668
 669            _path = str(path)[len(str(_directory.parent)):]
 670            if _path.startswith(os.path.sep):
 671                _path = _path[len(os.path.sep):]
 672            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
 673
 674            paths.append((path, _path))
 675    return paths
 676
 677
 678def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
 679    """
 680    Recursively replace passwords in a dictionary.
 681
 682    Parameters
 683    ----------
 684    d: Dict[str, Any]
 685        The dictionary to search through.
 686
 687    replace_with: str, default '*'
 688        The string to replace each character of the password with.
 689
 690    Returns
 691    -------
 692    Another dictionary where values to the keys `'password'`
 693    are replaced with `replace_with` (`'*'`).
 694
 695    Examples
 696    --------
 697    >>> replace_password({'a': 1})
 698    {'a': 1}
 699    >>> replace_password({'password': '123'})
 700    {'password': '***'}
 701    >>> replace_password({'nested': {'password': '123'}})
 702    {'nested': {'password': '***'}}
 703    >>> replace_password({'password': '123'}, replace_with='!')
 704    {'password': '!!!'}
 705
 706    """
 707    import copy
 708    _d = copy.deepcopy(d)
 709    for k, v in d.items():
 710        if isinstance(v, dict):
 711            _d[k] = replace_password(v)
 712        elif 'password' in str(k).lower():
 713            _d[k] = ''.join([replace_with for char in str(v)])
 714        elif str(k).lower() == 'uri':
 715            from meerschaum.connectors.sql import SQLConnector
 716            try:
 717                uri_params = SQLConnector.parse_uri(v)
 718            except Exception as e:
 719                uri_params = None
 720            if not uri_params:
 721                continue
 722            if not 'username' in uri_params or not 'password' in uri_params:
 723                continue
 724            _d[k] = v.replace(
 725                uri_params['username'] + ':' + uri_params['password'],
 726                uri_params['username'] + ':' + ''.join(
 727                    [replace_with for char in str(uri_params['password'])]
 728                )
 729            )
 730    return _d
 731
 732
 733def filter_arguments(
 734    func: Callable[[Any], Any],
 735    *args: Any,
 736    **kwargs: Any
 737) -> Tuple[Tuple[Any], Dict[str, Any]]:
 738    """
 739    Filter out unsupported positional and keyword arguments.
 740
 741    Parameters
 742    ----------
 743    func: Callable[[Any], Any]
 744        The function to inspect.
 745
 746    *args: Any
 747        Positional arguments to filter and pass to `func`.
 748
 749    **kwargs
 750        Keyword arguments to filter and pass to `func`.
 751
 752    Returns
 753    -------
 754    The `args` and `kwargs` accepted by `func`.
 755    """
 756    args = filter_positionals(func, *args)
 757    kwargs = filter_keywords(func, **kwargs)
 758    return args, kwargs
 759
 760
 761def filter_keywords(
 762    func: Callable[[Any], Any],
 763    **kw: Any
 764) -> Dict[str, Any]:
 765    """
 766    Filter out unsupported keyword arguments.
 767
 768    Parameters
 769    ----------
 770    func: Callable[[Any], Any]
 771        The function to inspect.
 772
 773    **kw: Any
 774        The arguments to be filtered and passed into `func`.
 775
 776    Returns
 777    -------
 778    A dictionary of keyword arguments accepted by `func`.
 779
 780    Examples
 781    --------
 782    ```python
 783    >>> def foo(a=1, b=2):
 784    ...     return a * b
 785    >>> filter_keywords(foo, a=2, b=4, c=6)
 786    {'a': 2, 'b': 4}
 787    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
 788    8
 789    ```
 790
 791    """
 792    import inspect
 793    func_params = inspect.signature(func).parameters
 794    ### If the function has a **kw method, skip filtering.
 795    for param, _type in func_params.items():
 796        if '**' in str(_type):
 797            return kw
 798    return {k: v for k, v in kw.items() if k in func_params}
 799
 800
 801def filter_positionals(
 802    func: Callable[[Any], Any],
 803    *args: Any
 804) -> Tuple[Any]:
 805    """
 806    Filter out unsupported positional arguments.
 807
 808    Parameters
 809    ----------
 810    func: Callable[[Any], Any]
 811        The function to inspect.
 812
 813    *args: Any
 814        The arguments to be filtered and passed into `func`.
 815        NOTE: If the function signature expects more arguments than provided,
 816        the missing slots will be filled with `None`.
 817
 818    Returns
 819    -------
 820    A tuple of positional arguments accepted by `func`.
 821
 822    Examples
 823    --------
 824    ```python
 825    >>> def foo(a, b):
 826    ...     return a * b
 827    >>> filter_positionals(foo, 2, 4, 6)
 828    (2, 4)
 829    >>> foo(*filter_positionals(foo, 2, 4, 6))
 830    8
 831    ```
 832
 833    """
 834    import inspect
 835    from meerschaum.utils.warnings import warn
 836    func_params = inspect.signature(func).parameters
 837    acceptable_args: List[Any] = []
 838
 839    def _warn_invalids(_num_invalid):
 840        if _num_invalid > 0:
 841            warn(
 842                "Too few arguments were provided. "
 843                + f"{_num_invalid} argument"
 844                + ('s have ' if _num_invalid != 1 else " has ")
 845                + " been filled with `None`.",
 846            )
 847
 848    num_invalid: int = 0
 849    for i, (param, val) in enumerate(func_params.items()):
 850        if '=' in str(val) or '*' in str(val):
 851            _warn_invalids(num_invalid)
 852            return tuple(acceptable_args)
 853
 854        try:
 855            acceptable_args.append(args[i])
 856        except IndexError:
 857            acceptable_args.append(None)
 858            num_invalid += 1
 859
 860    _warn_invalids(num_invalid)
 861    return tuple(acceptable_args)
 862
 863
 864def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
 865    """
 866    Convert an ordered dict to a dict.
 867    Does not mutate the original OrderedDict.
 868    """
 869    from collections import OrderedDict
 870    _d = dict(od)
 871    for k, v in od.items():
 872        if isinstance(v, OrderedDict) or (
 873            issubclass(type(v), OrderedDict)
 874        ):
 875            _d[k] = dict_from_od(v)
 876    return _d
 877
 878def remove_ansi(s: str) -> str:
 879    """
 880    Remove ANSI escape characters from a string.
 881
 882    Parameters
 883    ----------
 884    s: str:
 885        The string to be cleaned.
 886
 887    Returns
 888    -------
 889    A string with the ANSI characters removed.
 890
 891    Examples
 892    --------
 893    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
 894    'Hello, World!'
 895
 896    """
 897    import re
 898    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
 899
 900
 901def get_connector_labels(
 902    *types: str,
 903    search_term: str = '',
 904    ignore_exact_match = True,
 905    _additional_options: Optional[List[str]] = None,
 906) -> List[str]:
 907    """
 908    Read connector labels from the configuration dictionary.
 909
 910    Parameters
 911    ----------
 912    *types: str
 913        The connector types.
 914        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
 915
 916    search_term: str, default ''
 917        A filter on the connectors' labels.
 918
 919    ignore_exact_match: bool, default True
 920        If `True`, skip a connector if the search_term is an exact match.
 921
 922    Returns
 923    -------
 924    A list of the keys of defined connectors.
 925
 926    """
 927    from meerschaum.config import get_config
 928    connectors = get_config('meerschaum', 'connectors')
 929
 930    _types = list(types)
 931    if len(_types) == 0:
 932        _types = list(connectors.keys()) + ['plugin']
 933
 934    conns = []
 935    for t in _types:
 936        if t == 'plugin':
 937            from meerschaum.plugins import get_data_plugins
 938            conns += [
 939                f'{t}:' + plugin.module.__name__.split('.')[-1]
 940                for plugin in get_data_plugins()
 941            ]
 942            continue
 943        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
 944
 945    if _additional_options:
 946        conns += _additional_options
 947
 948    possibilities = [
 949        c
 950        for c in conns
 951        if c.startswith(search_term)
 952            and c != (
 953                search_term if ignore_exact_match else ''
 954            )
 955    ]
 956    return sorted(possibilities)
 957
 958
 959def json_serialize_datetime(dt: datetime) -> Union[str, None]:
 960    """
 961    Serialize a datetime object into JSON (ISO format string).
 962
 963    Examples
 964    --------
 965    >>> import json
 966    >>> from datetime import datetime
 967    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
 968    '{"a": "2022-01-01T00:00:00Z"}'
 969
 970    """
 971    if not isinstance(dt, datetime):
 972        return None
 973    tz_suffix = 'Z' if dt.tzinfo is None else ''
 974    return dt.isoformat() + tz_suffix
 975
 976
 977def wget(
 978    url: str,
 979    dest: Optional[Union[str, 'pathlib.Path']] = None,
 980    headers: Optional[Dict[str, Any]] = None,
 981    color: bool = True,
 982    debug: bool = False,
 983    **kw: Any
 984) -> 'pathlib.Path':
 985    """
 986    Mimic `wget` with `requests`.
 987
 988    Parameters
 989    ----------
 990    url: str
 991        The URL to the resource to be downloaded.
 992
 993    dest: Optional[Union[str, pathlib.Path]], default None
 994        The destination path of the downloaded file.
 995        If `None`, save to the current directory.
 996
 997    color: bool, default True
 998        If `debug` is `True`, print color output.
 999
1000    debug: bool, default False
1001        Verbosity toggle.
1002
1003    Returns
1004    -------
1005    The path to the downloaded file.
1006
1007    """
1008    from meerschaum.utils.warnings import warn, error
1009    from meerschaum.utils.debug import dprint
1010    import os, pathlib, re, urllib.request
1011    if headers is None:
1012        headers = {}
1013    request = urllib.request.Request(url, headers=headers)
1014    if not color:
1015        dprint = print
1016    if debug:
1017        dprint(f"Downloading from '{url}'...")
1018    try:
1019        response = urllib.request.urlopen(request)
1020    except Exception as e:
1021        import ssl
1022        ssl._create_default_https_context = ssl._create_unverified_context
1023        try:
1024            response = urllib.request.urlopen(request)
1025        except Exception as _e:
1026            print(_e)
1027            response = None
1028    if response is None or response.code != 200:
1029        error_msg = f"Failed to download from '{url}'."
1030        if color:
1031            error(error_msg)
1032        else:
1033            print(error_msg)
1034            import sys
1035            sys.exit(1)
1036
1037    d = response.headers.get('content-disposition', None)
1038    fname = (
1039        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1040        else url.split('/')[-1]
1041    )
1042
1043    if dest is None:
1044        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1045    elif isinstance(dest, str):
1046        dest = pathlib.Path(dest)
1047
1048    with open(dest, 'wb') as f:
1049        f.write(response.fp.read())
1050
1051    if debug:
1052        dprint(f"Downloaded file '{dest}'.")
1053
1054    return dest
1055
1056
1057def async_wrap(func):
1058    """
1059    Run a synchronous function as async.
1060    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1061    """
1062    import asyncio
1063    from functools import wraps, partial
1064
1065    @wraps(func)
1066    async def run(*args, loop=None, executor=None, **kwargs):
1067        if loop is None:
1068            loop = asyncio.get_event_loop()
1069        pfunc = partial(func, *args, **kwargs)
1070        return await loop.run_in_executor(executor, pfunc)
1071    return run
1072
1073
1074def debug_trace(browser: bool = True):
1075    """
1076    Open a web-based debugger to trace the execution of the program.
1077
1078    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1079    """
1080    from meerschaum.utils.debug import trace
1081    trace(browser=browser)
1082
1083
1084def items_str(
1085    items: List[Any],
1086    quotes: bool = True,
1087    quote_str: str = "'",
1088    commas: bool = True,
1089    comma_str: str = ',',
1090    and_: bool = True,
1091    and_str: str = 'and',
1092    oxford_comma: bool = True,
1093    spaces: bool = True,
1094    space_str = ' ',
1095) -> str:
1096    """
1097    Return a formatted string if list items separated by commas.
1098
1099    Parameters
1100    ----------
1101    items: [List[Any]]
1102        The items to be printed as an English list.
1103
1104    quotes: bool, default True
1105        If `True`, wrap items in quotes.
1106
1107    quote_str: str, default "'"
1108        If `quotes` is `True`, prepend and append each item with this string.
1109
1110    and_: bool, default True
1111        If `True`, include the word 'and' before the final item in the list.
1112
1113    and_str: str, default 'and'
1114        If `and_` is True, insert this string where 'and' normally would in and English list.
1115
1116    oxford_comma: bool, default True
1117        If `True`, include the Oxford Comma (comma before the final 'and').
1118        Only applies when `and_` is `True`.
1119
1120    spaces: bool, default True
1121        If `True`, separate items with `space_str`
1122
1123    space_str: str, default ' '
1124        If `spaces` is `True`, separate items with this string.
1125
1126    Returns
1127    -------
1128    A string of the items as an English list.
1129
1130    Examples
1131    --------
1132    >>> items_str([1,2,3])
1133    "'1', '2', and '3'"
1134    >>> items_str([1,2,3], quotes=False)
1135    '1, 2, and 3'
1136    >>> items_str([1,2,3], and_=False)
1137    "'1', '2', '3'"
1138    >>> items_str([1,2,3], spaces=False, and_=False)
1139    "'1','2','3'"
1140    >>> items_str([1,2,3], oxford_comma=False)
1141    "'1', '2' and '3'"
1142    >>> items_str([1,2,3], quote_str=":")
1143    ':1:, :2:, and :3:'
1144    >>> items_str([1,2,3], and_str="or")
1145    "'1', '2', or '3'"
1146    >>> items_str([1,2,3], space_str="_")
1147    "'1',_'2',_and_'3'"
1148
1149    """
1150    if not items:
1151        return ''
1152    
1153    q = quote_str if quotes else ''
1154    s = space_str if spaces else ''
1155    a = and_str if and_ else ''
1156    c = comma_str if commas else ''
1157
1158    if len(items) == 1:
1159        return q + str(list(items)[0]) + q
1160
1161    if len(items) == 2:
1162        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1163
1164    sep = q + c + s + q
1165    output = q + sep.join(str(i) for i in items[:-1]) + q
1166    if oxford_comma:
1167        output += c
1168    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1169    return output
1170
1171
1172def interval_str(delta: Union[timedelta, int]) -> str:
1173    """
1174    Return a human-readable string for a `timedelta` (or `int` minutes).
1175
1176    Parameters
1177    ----------
1178    delta: Union[timedelta, int]
1179        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1180
1181    Returns
1182    -------
1183    A formatted string, fit for human eyes.
1184    """
1185    from meerschaum.utils.packages import attempt_import
1186    humanfriendly = attempt_import('humanfriendly')
1187    delta_seconds = (
1188        delta.total_seconds()
1189        if isinstance(delta, timedelta)
1190        else (delta * 60)
1191    )
1192    return humanfriendly.format_timespan(delta_seconds)
1193
1194
1195def is_docker_available() -> bool:
1196    """Check if we can connect to the Docker engine."""
1197    import subprocess
1198    try:
1199        has_docker = subprocess.call(
1200            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1201        ) == 0
1202    except Exception as e:
1203        has_docker = False
1204    return has_docker
1205
1206
1207def is_android() -> bool:
1208    """Return `True` if the current platform is Android."""
1209    import sys
1210    return hasattr(sys, 'getandroidapilevel')
1211
1212
1213def is_bcp_available() -> bool:
1214    """Check if the MSSQL `bcp` utility is installed."""
1215    import subprocess
1216
1217    try:
1218        has_bcp = subprocess.call(
1219            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1220        ) == 0
1221    except Exception as e:
1222        has_bcp = False
1223    return has_bcp
1224
1225
1226def is_systemd_available() -> bool:
1227    """Check if running on systemd."""
1228    import subprocess
1229    try:
1230        has_systemctl = subprocess.call(
1231            ['systemctl', '-h'],
1232            stdout=subprocess.DEVNULL,
1233            stderr=subprocess.STDOUT,
1234        ) == 0
1235    except Exception:
1236        has_systemctl = False
1237    return has_systemctl
1238
1239def get_last_n_lines(file_name: str, N: int):
1240    """
1241    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1242    """
1243    import os
1244    # Create an empty list to keep the track of last N lines
1245    list_of_lines = []
1246    # Open file for reading in binary mode
1247    with open(file_name, 'rb') as read_obj:
1248        # Move the cursor to the end of the file
1249        read_obj.seek(0, os.SEEK_END)
1250        # Create a buffer to keep the last read line
1251        buffer = bytearray()
1252        # Get the current position of pointer i.e eof
1253        pointer_location = read_obj.tell()
1254        # Loop till pointer reaches the top of the file
1255        while pointer_location >= 0:
1256            # Move the file pointer to the location pointed by pointer_location
1257            read_obj.seek(pointer_location)
1258            # Shift pointer location by -1
1259            pointer_location = pointer_location -1
1260            # read that byte / character
1261            new_byte = read_obj.read(1)
1262            # If the read byte is new line character then it means one line is read
1263            if new_byte == b'\n':
1264                # Save the line in list of lines
1265                list_of_lines.append(buffer.decode()[::-1])
1266                # If the size of list reaches N, then return the reversed list
1267                if len(list_of_lines) == N:
1268                    return list(reversed(list_of_lines))
1269                # Reinitialize the byte array to save next line
1270                buffer = bytearray()
1271            else:
1272                # If last read character is not eol then add it in buffer
1273                buffer.extend(new_byte)
1274        # As file is read completely, if there is still data in buffer, then its first line.
1275        if len(buffer) > 0:
1276            list_of_lines.append(buffer.decode()[::-1])
1277    # return the reversed list
1278    return list(reversed(list_of_lines))
1279
1280
1281def tail(f, n, offset=None):
1282    """
1283    https://stackoverflow.com/a/692616/9699829
1284    
1285    Reads n lines from f with an offset of offset lines.  The return
1286    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1287    an indicator that is `True` if there are more lines in the file.
1288    """
1289    avg_line_length = 74
1290    to_read = n + (offset or 0)
1291
1292    while True:
1293        try:
1294            f.seek(-(avg_line_length * to_read), 2)
1295        except IOError:
1296            # woops.  apparently file is smaller than what we want
1297            # to step back, go to the beginning instead
1298            f.seek(0)
1299        pos = f.tell()
1300        lines = f.read().splitlines()
1301        if len(lines) >= to_read or pos == 0:
1302            return lines[-to_read:offset and -offset or None], \
1303                   len(lines) > to_read or pos > 0
1304        avg_line_length *= 1.3
1305
1306
1307def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1308    """
1309    Remove characters from each section of a string until the length is within the limit.
1310
1311    Parameters
1312    ----------
1313    item: str
1314        The item name to be truncated.
1315
1316    delimeter: str, default '_'
1317        Split `item` by this string into several sections.
1318
1319    max_len: int, default 128
1320        The max acceptable length of the truncated version of `item`.
1321
1322    Returns
1323    -------
1324    The truncated string.
1325
1326    Examples
1327    --------
1328    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1329    'ab_de_gh'
1330
1331    """
1332    if len(item) < max_len:
1333        return item
1334
1335    def _shorten(s: str) -> str:
1336        return s[:-1] if len(s) > 1 else s
1337
1338    sections = list(enumerate(item.split('_')))
1339    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1340    available_chars = max_len - len(sections)
1341
1342    _sections = [(i, s) for i, s in sorted_sections]
1343    _sections_len = sum([len(s) for i, s in _sections])
1344    _old_sections_len = _sections_len
1345    while _sections_len > available_chars:
1346        _sections = [(i, _shorten(s)) for i, s in _sections]
1347        _old_sections_len = _sections_len
1348        _sections_len = sum([len(s) for i, s in _sections])
1349        if _old_sections_len == _sections_len:
1350            raise Exception(f"String could not be truncated: '{item}'")
1351
1352    new_sections = sorted(_sections, key=lambda x: x[0])
1353    return delimeter.join([s for i, s in new_sections])
1354
1355
1356def separate_negation_values(
1357    vals: Union[List[str], Tuple[str]],
1358    negation_prefix: Optional[str] = None,
1359) -> Tuple[List[str], List[str]]:
1360    """
1361    Separate the negated values from the positive ones.
1362    Return two lists: positive and negative values.
1363
1364    Parameters
1365    ----------
1366    vals: Union[List[str], Tuple[str]]
1367        A list of strings to parse.
1368
1369    negation_prefix: Optional[str], default None
1370        Include values that start with this string in the second list.
1371        If `None`, use the system default (`_`).
1372    """
1373    if negation_prefix is None:
1374        from meerschaum.config.static import STATIC_CONFIG
1375        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1376    _in_vals, _ex_vals = [], []
1377    for v in vals:
1378        if str(v).startswith(negation_prefix):
1379            _ex_vals.append(str(v)[len(negation_prefix):])
1380        else:
1381            _in_vals.append(v)
1382
1383    return _in_vals, _ex_vals
1384
1385
1386def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1387    """
1388    Translate a params dictionary into lists of include- and exclude-values.
1389
1390    Parameters
1391    ----------
1392    params: Optional[Dict[str, Any]]
1393        A params query dictionary.
1394
1395    Returns
1396    -------
1397    A dictionary mapping keys to a tuple of lists for include and exclude values.
1398
1399    Examples
1400    --------
1401    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1402    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1403    """
1404    if not params:
1405        return {}
1406    return {
1407        col: separate_negation_values(
1408            (
1409                val
1410                if isinstance(val, (list, tuple))
1411                else [val]
1412            )
1413        )
1414        for col, val in params.items()
1415    }
1416
1417
1418def flatten_list(list_: List[Any]) -> List[Any]:
1419    """
1420    Recursively flatten a list.
1421    """
1422    for item in list_:
1423        if isinstance(item, list):
1424            yield from flatten_list(item)
1425        else:
1426            yield item
1427
1428
1429def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1430    """
1431    Parse a string containing the text to be passed into a function
1432    and return a tuple of args, kwargs.
1433
1434    Parameters
1435    ----------
1436    args_str: str
1437        The contents of the function parameter (as a string).
1438
1439    Returns
1440    -------
1441    A tuple of args (tuple) and kwargs (dict[str, Any]).
1442
1443    Examples
1444    --------
1445    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1446    (123, 456), {'foo': 789, 'bar': 'baz'}
1447    """
1448    import ast
1449    args = []
1450    kwargs = {}
1451
1452    for part in args_str.split(','):
1453        if '=' in part:
1454            key, val = part.split('=', 1)
1455            kwargs[key.strip()] = ast.literal_eval(val)
1456        else:
1457            args.append(ast.literal_eval(part.strip()))
1458
1459    return tuple(args), kwargs
1460
1461
1462def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple:
1463    """
1464    Wrap around `pathlib.Path.symlink_to`, but add support for Windows.
1465
1466    Parameters
1467    ----------
1468    src_path: pathlib.Path
1469        The source path.
1470
1471    dest_path: pathlib.Path
1472        The destination path.
1473
1474    Returns
1475    -------
1476    A SuccessTuple indicating success.
1477    """
1478    if dest_path.exists() and dest_path.resolve() == src_path.resolve():
1479        return True, "Symlink already exists."
1480    try:
1481        dest_path.symlink_to(src_path)
1482        success = True
1483    except Exception as e:
1484        success = False
1485        msg = str(e)
1486    if success:
1487        return success, "Success"
1488
1489    ### Failed to create a symlink.
1490    ### If we're not on Windows, return an error.
1491    import platform
1492    if platform.system() != 'Windows':
1493        return success, msg
1494
1495    try:
1496        import _winapi
1497    except ImportError:
1498        return False, "Unable to import _winapi."
1499
1500    if src_path.is_dir():
1501        try:
1502            _winapi.CreateJunction(str(src_path), str(dest_path))
1503        except Exception as e:
1504            return False, str(e)
1505        return True, "Success"
1506
1507    ### Last resort: copy the file on Windows.
1508    import shutil
1509    try:
1510        shutil.copy(src_path, dest_path)
1511    except Exception as e:
1512        return False, str(e)
1513
1514    return True, "Success"
1515
1516
1517def is_symlink(path: pathlib.Path) -> bool:
1518    """
1519    Wrap `path.is_symlink()` but add support for Windows junctions.
1520    """
1521    if path.is_symlink():
1522        return True
1523    import platform, os
1524    if platform.system() != 'Windows':
1525        return False
1526    try:
1527        return bool(os.readlink(path))
1528    except OSError:
1529        return False
1530
1531
1532def parametrized(dec):
1533    """
1534    A meta-decorator for allowing other decorator functions to have parameters.
1535
1536    https://stackoverflow.com/a/26151604/9699829
1537    """
1538    def layer(*args, **kwargs):
1539        def repl(f):
1540            return dec(f, *args, **kwargs)
1541        return repl
1542    return layer
1543
1544
1545def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1546    """
1547    Safely extract a TAR file to a give directory.
1548    This defends against CVE-2007-4559.
1549
1550    Parameters
1551    ----------
1552    tarf: file
1553        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1554
1555    output_dir: Union[str, pathlib.Path]
1556        The output directory.
1557    """
1558    import os
1559
1560    def is_within_directory(directory, target):
1561        abs_directory = os.path.abspath(directory)
1562        abs_target = os.path.abspath(target)
1563        prefix = os.path.commonprefix([abs_directory, abs_target])
1564        return prefix == abs_directory 
1565
1566    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1567        for member in tar.getmembers():
1568            member_path = os.path.join(path, member.name)
1569            if not is_within_directory(path, member_path):
1570                raise Exception("Attempted Path Traversal in Tar File")
1571
1572        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1573
1574    return safe_extract(tarf, output_dir)
1575
1576
1577##################
1578# Legacy imports #
1579##################
1580
1581def choose_subaction(*args, **kwargs) -> Any:
1582    """
1583    Placeholder function to prevent breaking legacy behavior.
1584    See `meerschaum.actions.choose_subaction`.
1585    """
1586    from meerschaum.actions import choose_subaction as _choose_subactions
1587    return _choose_subactions(*args, **kwargs)
1588
1589
1590def print_options(*args, **kwargs) -> None:
1591    """
1592    Placeholder function to prevent breaking legacy behavior.
1593    See `meerschaum.utils.formatting.print_options`.
1594    """
1595    from meerschaum.utils.formatting import print_options as _print_options
1596    return _print_options(*args, **kwargs)
1597
1598
1599def to_pandas_dtype(*args, **kwargs) -> Any:
1600    """
1601    Placeholder function to prevent breaking legacy behavior.
1602    See `meerschaum.utils.dtypes.to_pandas_dtype`.
1603    """
1604    from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype
1605    return _to_pandas_dtype(*args, **kwargs)
1606
1607
1608def filter_unseen_df(*args, **kwargs) -> Any:
1609    """
1610    Placeholder function to prevent breaking legacy behavior.
1611    See `meerschaum.utils.dataframe.filter_unseen_df`.
1612    """
1613    from meerschaum.utils.dataframe import filter_unseen_df as real_function
1614    return real_function(*args, **kwargs)
1615
1616
1617def add_missing_cols_to_df(*args, **kwargs) -> Any:
1618    """
1619    Placeholder function to prevent breaking legacy behavior.
1620    See `meerschaum.utils.dataframe.add_missing_cols_to_df`.
1621    """
1622    from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function
1623    return real_function(*args, **kwargs)
1624
1625
1626def parse_df_datetimes(*args, **kwargs) -> Any:
1627    """
1628    Placeholder function to prevent breaking legacy behavior.
1629    See `meerschaum.utils.dataframe.parse_df_datetimes`.
1630    """
1631    from meerschaum.utils.dataframe import parse_df_datetimes as real_function
1632    return real_function(*args, **kwargs)
1633
1634
1635def df_from_literal(*args, **kwargs) -> Any:
1636    """
1637    Placeholder function to prevent breaking legacy behavior.
1638    See `meerschaum.utils.dataframe.df_from_literal`.
1639    """
1640    from meerschaum.utils.dataframe import df_from_literal as real_function
1641    return real_function(*args, **kwargs)
1642
1643
1644def get_json_cols(*args, **kwargs) -> Any:
1645    """
1646    Placeholder function to prevent breaking legacy behavior.
1647    See `meerschaum.utils.dataframe.get_json_cols`.
1648    """
1649    from meerschaum.utils.dataframe import get_json_cols as real_function
1650    return real_function(*args, **kwargs)
1651
1652
1653def get_unhashable_cols(*args, **kwargs) -> Any:
1654    """
1655    Placeholder function to prevent breaking legacy behavior.
1656    See `meerschaum.utils.dataframe.get_unhashable_cols`.
1657    """
1658    from meerschaum.utils.dataframe import get_unhashable_cols as real_function
1659    return real_function(*args, **kwargs)
1660
1661
1662def enforce_dtypes(*args, **kwargs) -> Any:
1663    """
1664    Placeholder function to prevent breaking legacy behavior.
1665    See `meerschaum.utils.dataframe.enforce_dtypes`.
1666    """
1667    from meerschaum.utils.dataframe import enforce_dtypes as real_function
1668    return real_function(*args, **kwargs)
1669
1670
1671def get_datetime_bound_from_df(*args, **kwargs) -> Any:
1672    """
1673    Placeholder function to prevent breaking legacy behavior.
1674    See `meerschaum.utils.dataframe.get_datetime_bound_from_df`.
1675    """
1676    from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function
1677    return real_function(*args, **kwargs)
1678
1679
1680def df_is_chunk_generator(*args, **kwargs) -> Any:
1681    """
1682    Placeholder function to prevent breaking legacy behavior.
1683    See `meerschaum.utils.dataframe.df_is_chunk_generator`.
1684    """
1685    from meerschaum.utils.dataframe import df_is_chunk_generator as real_function
1686    return real_function(*args, **kwargs)
1687
1688
1689def choices_docstring(*args, **kwargs) -> Any:
1690    """
1691    Placeholder function to prevent breaking legacy behavior.
1692    See `meerschaum.actions.choices_docstring`.
1693    """
1694    from meerschaum.actions import choices_docstring as real_function
1695    return real_function(*args, **kwargs)
1696
1697
1698def _get_subaction_names(*args, **kwargs) -> Any:
1699    """
1700    Placeholder function to prevent breaking legacy behavior.
1701    See `meerschaum.actions._get_subaction_names`.
1702    """
1703    from meerschaum.actions import _get_subaction_names as real_function
1704    return real_function(*args, **kwargs)
1705
1706
1707_current_module = sys.modules[__name__]
1708__all__ = tuple(
1709    name
1710    for name, obj in globals().items()
1711    if callable(obj)
1712        and name not in __pdoc__
1713        and getattr(obj, '__module__', None) == _current_module.__name__
1714)
def add_method_to_class( func: Callable[[Any], Any], class_def: "'Class'", method_name: Optional[str] = None, keep_self: Optional[bool] = None) -> Callable[[Any], Any]:
49def add_method_to_class(
50        func: Callable[[Any], Any],
51        class_def: 'Class',
52        method_name: Optional[str] = None,
53        keep_self: Optional[bool] = None,
54    ) -> Callable[[Any], Any]:
55    """
56    Add function `func` to class `class_def`.
57
58    Parameters
59    ----------
60    func: Callable[[Any], Any]
61        Function to be added as a method of the class
62
63    class_def: Class
64        Class to be modified.
65
66    method_name: Optional[str], default None
67        New name of the method. None will use func.__name__ (default).
68
69    Returns
70    -------
71    The modified function object.
72
73    """
74    from functools import wraps
75
76    is_class = isinstance(class_def, type)
77    
78    @wraps(func)
79    def wrapper(self, *args, **kw):
80        return func(*args, **kw)
81
82    if method_name is None:
83        method_name = func.__name__
84
85    setattr(class_def, method_name, (
86            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
87        )
88    )
89
90    return func

Add function func to class class_def.

Parameters
  • func (Callable[[Any], Any]): Function to be added as a method of the class
  • class_def (Class): Class to be modified.
  • method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
  • The modified function object.
def generate_password(length: int = 12) -> str:
 93def generate_password(length: int = 12) -> str:
 94    """Generate a secure password of given length.
 95
 96    Parameters
 97    ----------
 98    length : int, default 12
 99        The length of the password.
100
101    Returns
102    -------
103    A random password string.
104
105    """
106    import secrets, string
107    return ''.join((secrets.choice(string.ascii_letters) for i in range(length)))

Generate a secure password of given length.

Parameters
  • length (int, default 12): The length of the password.
Returns
  • A random password string.
def is_int(s: str) -> bool:
109def is_int(s : str) -> bool:
110    """
111    Check if string is an int.
112
113    Parameters
114    ----------
115    s: str
116        The string to be checked.
117        
118    Returns
119    -------
120    A bool indicating whether the string was able to be cast to an integer.
121
122    """
123    try:
124        float(s)
125    except Exception as e:
126        return False
127    
128    return float(s).is_integer()

Check if string is an int.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string was able to be cast to an integer.
def string_to_dict(params_string: str) -> Dict[str, Any]:
131def string_to_dict(
132        params_string: str
133    ) -> Dict[str, Any]:
134    """
135    Parse a string into a dictionary.
136    
137    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
138
139    Parameters
140    ----------
141    params_string: str
142        The string to be parsed.
143        
144    Returns
145    -------
146    The parsed dictionary.
147
148    Examples
149    --------
150    >>> string_to_dict("a:1,b:2") 
151    {'a': 1, 'b': 2}
152    >>> string_to_dict('{"a": 1, "b": 2}')
153    {'a': 1, 'b': 2}
154
155    """
156    if params_string == "":
157        return {}
158
159    import json
160
161    ### Kind of a weird edge case.
162    ### In the generated compose file, there is some weird escaping happening,
163    ### so the string to be parsed starts and ends with a single quote.
164    if (
165        isinstance(params_string, str)
166        and len(params_string) > 4
167        and params_string[1] == "{"
168        and params_string[-2] == "}"
169    ):
170        return json.loads(params_string[1:-1])
171    if str(params_string).startswith('{'):
172        return json.loads(params_string)
173
174    import ast
175    params_dict = {}
176    for param in params_string.split(","):
177        _keys = param.split(":")
178        keys = _keys[:-1]
179        try:
180            val = ast.literal_eval(_keys[-1])
181        except Exception as e:
182            val = str(_keys[-1])
183
184        c = params_dict
185        for _k in keys[:-1]:
186            try:
187                k = ast.literal_eval(_k)
188            except Exception as e:
189                k = str(_k)
190            if k not in c:
191                c[k] = {}
192            c = c[k]
193
194        c[keys[-1]] = val
195
196    return params_dict

Parse a string into a dictionary.

If the string begins with '{', parse as JSON. Otherwise use simple parsing.

Parameters
  • params_string (str): The string to be parsed.
Returns
  • The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2") 
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
def parse_config_substitution( value: str, leading_key: str = 'MRSM', begin_key: str = '{', end_key: str = '}', delimeter: str = ':') -> List[Any]:
199def parse_config_substitution(
200        value: str,
201        leading_key: str = 'MRSM',
202        begin_key: str = '{',
203        end_key: str = '}',
204        delimeter: str = ':'
205    ) -> List[Any]:
206    """
207    Parse Meerschaum substitution syntax
208    E.g. MRSM{value1:value2} => ['value1', 'value2']
209    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
210    """
211    if not value.beginswith(leading_key):
212        return value
213
214    return leading_key[len(leading_key):][len():-1].split(delimeter)

Parse Meerschaum substitution syntax E.g. MRSM{value1:value2} => ['value1', 'value2'] NOTE: Not currently used. See search_and_substitute_config in meerschaum.config._read_yaml.

def edit_file( path: Union[pathlib.Path, str], default_editor: str = 'pyvim', debug: bool = False) -> bool:
217def edit_file(
218    path: Union['pathlib.Path', str],
219    default_editor: str = 'pyvim',
220    debug: bool = False
221) -> bool:
222    """
223    Open a file for editing.
224
225    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
226
227    Parameters
228    ----------
229    path: Union[pathlib.Path, str]
230        The path to the file to be edited.
231
232    default_editor: str, default 'pyvim'
233        If `$EDITOR` is not set, use this instead.
234        If `pyvim` is not installed, it will install it from PyPI.
235
236    debug: bool, default False
237        Verbosity toggle.
238
239    Returns
240    -------
241    A bool indicating the file was successfully edited.
242    """
243    import os
244    from subprocess import call
245    from meerschaum.utils.debug import dprint
246    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
247    try:
248        EDITOR = os.environ.get('EDITOR', default_editor)
249        if debug:
250            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
251        rc = call([EDITOR, path])
252    except Exception as e: ### can't open with default editors
253        if debug:
254            dprint(str(e))
255            dprint('Failed to open file with system editor. Falling back to pyvim...')
256        pyvim = attempt_import('pyvim', lazy=False)
257        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
258    return rc == 0

Open a file for editing.

Attempt to launch the user's defined $EDITOR, otherwise use pyvim.

Parameters
  • path (Union[pathlib.Path, str]): The path to the file to be edited.
  • default_editor (str, default 'pyvim'): If $EDITOR is not set, use this instead. If pyvim is not installed, it will install it from PyPI.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating the file was successfully edited.
def is_pipe_registered( pipe: meerschaum.Pipe, pipes: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]], debug: bool = False) -> bool:
261def is_pipe_registered(
262    pipe: mrsm.Pipe,
263    pipes: PipesDict,
264    debug: bool = False
265) -> bool:
266    """
267    Check if a Pipe is inside the pipes dictionary.
268
269    Parameters
270    ----------
271    pipe: meerschaum.Pipe
272        The pipe to see if it's in the dictionary.
273
274    pipes: PipesDict
275        The dictionary to search inside.
276
277    debug: bool, default False
278        Verbosity toggle.
279
280    Returns
281    -------
282    A bool indicating whether the pipe is inside the dictionary.
283    """
284    from meerschaum.utils.debug import dprint
285    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
286    if debug:
287        dprint(f'{ck}, {mk}, {lk}')
288        dprint(f'{pipe}, {pipes}')
289    return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]

Check if a Pipe is inside the pipes dictionary.

Parameters
  • pipe (meerschaum.Pipe): The pipe to see if it's in the dictionary.
  • pipes (PipesDict): The dictionary to search inside.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating whether the pipe is inside the dictionary.
def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
292def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
293    """
294    Determine the columns and lines in the terminal.
295    If they cannot be determined, return the default values (100 columns and 120 lines).
296
297    Parameters
298    ----------
299    default_cols: int, default 100
300        If the columns cannot be determined, return this value.
301
302    default_lines: int, default 120
303        If the lines cannot be determined, return this value.
304
305    Returns
306    -------
307    A tuple if integers for the columns and lines.
308    """
309    import os
310    try:
311        size = os.get_terminal_size()
312        _cols, _lines = size.columns, size.lines
313    except Exception as e:
314        _cols, _lines = (
315            int(os.environ.get('COLUMNS', str(default_cols))),
316            int(os.environ.get('LINES', str(default_lines))),
317        )
318    return _cols, _lines

Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).

Parameters
  • default_cols (int, default 100): If the columns cannot be determined, return this value.
  • default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
  • A tuple if integers for the columns and lines.
def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
321def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
322    """
323    Iterate over a list in chunks.
324    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
325
326    Parameters
327    ----------
328    iterable: Iterable[Any]
329        The iterable to iterate over in chunks.
330        
331    chunksize: int
332        The size of chunks to iterate with.
333        
334    fillvalue: Optional[Any], default None
335        If the chunks do not evenly divide into the iterable, pad the end with this value.
336
337    Returns
338    -------
339    A generator of tuples of size `chunksize`.
340
341    """
342    from itertools import zip_longest
343    args = [iter(iterable)] * chunksize
344    return zip_longest(*args, fillvalue=fillvalue)

Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks

Parameters
  • iterable (Iterable[Any]): The iterable to iterate over in chunks.
  • chunksize (int): The size of chunks to iterate with.
  • fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
  • A generator of tuples of size chunksize.
def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
346def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
347    """
348    Sort a dictionary's values and return a new dictionary.
349
350    Parameters
351    ----------
352    d: Dict[Any, Any]
353        The dictionary to be sorted.
354
355    Returns
356    -------
357    A sorted dictionary.
358
359    Examples
360    --------
361    >>> sorted_dict({'b': 1, 'a': 2})
362    {'b': 1, 'a': 2}
363    >>> sorted_dict({'b': 2, 'a': 1})
364    {'a': 1, 'b': 2}
365
366    """
367    try:
368        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
369    except Exception as e:
370        return d

Sort a dictionary's values and return a new dictionary.

Parameters
  • d (Dict[Any, Any]): The dictionary to be sorted.
Returns
  • A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
def flatten_pipes_dict( pipes_dict: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]) -> 'List[Pipe]':
372def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
373    """
374    Convert the standard pipes dictionary into a list.
375
376    Parameters
377    ----------
378    pipes_dict: PipesDict
379        The pipes dictionary to be flattened.
380
381    Returns
382    -------
383    A list of `Pipe` objects.
384
385    """
386    pipes_list = []
387    for ck in pipes_dict.values():
388        for mk in ck.values():
389            pipes_list += list(mk.values())
390    return pipes_list

Convert the standard pipes dictionary into a list.

Parameters
  • pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
  • A list of Pipe objects.
def round_time( dt: Optional[datetime.datetime] = None, date_delta: Optional[datetime.timedelta] = None, to: str = 'down') -> datetime.datetime:
393def round_time(
394    dt: Optional[datetime] = None,
395    date_delta: Optional[timedelta] = None,
396    to: 'str' = 'down'
397) -> datetime:
398    """
399    Round a datetime object to a multiple of a timedelta.
400    http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
401
402    NOTE: This function strips timezone information!
403
404    Parameters
405    ----------
406    dt: Optional[datetime], default None
407        If `None`, grab the current UTC datetime.
408
409    date_delta: Optional[timedelta], default None
410        If `None`, use a delta of 1 minute.
411
412    to: 'str', default 'down'
413        Available options are `'up'`, `'down'`, and `'closest'`.
414
415    Returns
416    -------
417    A rounded `datetime` object.
418
419    Examples
420    --------
421    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
422    datetime.datetime(2022, 1, 1, 12, 15)
423    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
424    datetime.datetime(2022, 1, 1, 12, 16)
425    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
426    datetime.datetime(2022, 1, 1, 12, 0)
427    >>> round_time(
428    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
429    ...   timedelta(hours=1),
430    ...   to = 'closest'
431    ... )
432    datetime.datetime(2022, 1, 1, 12, 0)
433    >>> round_time(
434    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
435    ...   datetime.timedelta(hours=1),
436    ...   to = 'closest'
437    ... )
438    datetime.datetime(2022, 1, 1, 13, 0)
439
440    """
441    if date_delta is None:
442        date_delta = timedelta(minutes=1)
443    round_to = date_delta.total_seconds()
444    if dt is None:
445        dt = datetime.now(timezone.utc).replace(tzinfo=None)
446    seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds
447
448    if seconds % round_to == 0 and dt.microsecond == 0:
449        rounding = (seconds + round_to / 2) // round_to * round_to
450    else:
451        if to == 'up':
452            rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to
453        elif to == 'down':
454            rounding = seconds // round_to * round_to
455        else:
456            rounding = (seconds + round_to / 2) // round_to * round_to
457
458    return dt + timedelta(0, rounding - seconds, - dt.microsecond)

Round a datetime object to a multiple of a timedelta. http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python

NOTE: This function strips timezone information!

Parameters
  • dt (Optional[datetime], default None): If None, grab the current UTC datetime.
  • date_delta (Optional[timedelta], default None): If None, use a delta of 1 minute.
  • to ('str', default 'down'): Available options are 'up', 'down', and 'closest'.
Returns
  • A rounded datetime object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 15, 57, 200),
...   timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 45, 57, 200),
...   datetime.timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)
def timed_input( seconds: int = 10, timeout_message: str = '', prompt: str = '', icon: bool = False, **kw) -> Optional[str]:
461def timed_input(
462        seconds: int = 10,
463        timeout_message: str = "",
464        prompt: str = "",
465        icon: bool = False,
466        **kw
467    ) -> Union[str, None]:
468    """
469    Accept user input only for a brief period of time.
470
471    Parameters
472    ----------
473    seconds: int, default 10
474        The number of seconds to wait.
475
476    timeout_message: str, default ''
477        The message to print after the window has elapsed.
478
479    prompt: str, default ''
480        The prompt to print during the window.
481
482    icon: bool, default False
483        If `True`, print the configured input icon.
484
485
486    Returns
487    -------
488    The input string entered by the user.
489
490    """
491    import signal, time
492
493    class TimeoutExpired(Exception):
494        """Raise this exception when the timeout is reached."""
495
496    def alarm_handler(signum, frame):
497        raise TimeoutExpired
498
499    # set signal handler
500    signal.signal(signal.SIGALRM, alarm_handler)
501    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
502
503    try:
504        return input(prompt)
505    except TimeoutExpired:
506        return None
507    except (EOFError, RuntimeError):
508        try:
509            print(prompt)
510            time.sleep(seconds)
511        except TimeoutExpired:
512            return None
513    finally:
514        signal.alarm(0) # cancel alarm

Accept user input only for a brief period of time.

Parameters
  • seconds (int, default 10): The number of seconds to wait.
  • timeout_message (str, default ''): The message to print after the window has elapsed.
  • prompt (str, default ''): The prompt to print during the window.
  • icon (bool, default False): If True, print the configured input icon.
Returns
  • The input string entered by the user.
def replace_pipes_in_dict( pipes: Optional[Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]] = None, func: "'function'" = <class 'str'>, debug: bool = False, **kw) -> Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]:
520def replace_pipes_in_dict(
521        pipes : Optional[PipesDict] = None,
522        func: 'function' = str,
523        debug: bool = False,
524        **kw
525    ) -> PipesDict:
526    """
527    Replace the Pipes in a Pipes dict with the result of another function.
528
529    Parameters
530    ----------
531    pipes: Optional[PipesDict], default None
532        The pipes dict to be processed.
533
534    func: Callable[[Any], Any], default str
535        The function to be applied to every pipe.
536        Defaults to the string constructor.
537
538    debug: bool, default False
539        Verbosity toggle.
540    
541
542    Returns
543    -------
544    A dictionary where every pipe is replaced with the output of a function.
545
546    """
547    import copy
548    def change_dict(d : Dict[Any, Any], func : 'function') -> None:
549        for k, v in d.items():
550            if isinstance(v, dict):
551                change_dict(v, func)
552            else:
553                d[k] = func(v)
554
555    if pipes is None:
556        from meerschaum import get_pipes
557        pipes = get_pipes(debug=debug, **kw)
558
559    result = copy.deepcopy(pipes)
560    change_dict(result, func)
561    return result

Replace the Pipes in a Pipes dict with the result of another function.

Parameters
  • pipes (Optional[PipesDict], default None): The pipes dict to be processed.
  • func (Callable[[Any], Any], default str): The function to be applied to every pipe. Defaults to the string constructor.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A dictionary where every pipe is replaced with the output of a function.
def enforce_gevent_monkey_patch():
563def enforce_gevent_monkey_patch():
564    """
565    Check if gevent monkey patching is enabled, and if not, then apply patching.
566    """
567    from meerschaum.utils.packages import attempt_import
568    import socket
569    gevent, gevent_socket, gevent_monkey = attempt_import(
570        'gevent', 'gevent.socket', 'gevent.monkey'
571    )
572    if not socket.socket is gevent_socket.socket:
573        gevent_monkey.patch_all()

Check if gevent monkey patching is enabled, and if not, then apply patching.

def is_valid_email(email: str) -> Optional[re.Match]:
575def is_valid_email(email: str) -> Union['re.Match', None]:
576    """
577    Check whether a string is a valid email.
578
579    Parameters
580    ----------
581    email: str
582        The string to be examined.
583        
584    Returns
585    -------
586    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
587
588    Examples
589    --------
590    >>> is_valid_email('foo')
591    >>> is_valid_email('foo@foo.com')
592    <re.Match object; span=(0, 11), match='foo@foo.com'>
593
594    """
595    import re
596    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
597    return re.search(regex, email)

Check whether a string is a valid email.

Parameters
  • email (str): The string to be examined.
Returns
  • None if a string is not in email format, otherwise a re.Match object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
def string_width(string: str, widest: bool = True) -> int:
600def string_width(string: str, widest: bool = True) -> int:
601    """
602    Calculate the width of a string, either by its widest or last line.
603
604    Parameters
605    ----------
606    string: str:
607        The string to be examined.
608        
609    widest: bool, default True
610        No longer used because `widest` is always assumed to be true.
611
612    Returns
613    -------
614    An integer for the text's visual width.
615
616    Examples
617    --------
618    >>> string_width('a')
619    1
620    >>> string_width('a\\nbc\\nd')
621    2
622
623    """
624    def _widest():
625        words = string.split('\n')
626        max_length = 0
627        for w in words:
628            length = len(w)
629            if length > max_length:
630                max_length = length
631        return max_length
632
633    return _widest()

Calculate the width of a string, either by its widest or last line.

Parameters
  • string (str:): The string to be examined.
  • widest (bool, default True): No longer used because widest is always assumed to be true.
Returns
  • An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
def _pyinstaller_traverse_dir( directory: str, ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), include_dotfiles: bool = False) -> list:
635def _pyinstaller_traverse_dir(
636        directory: str,
637        ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
638        include_dotfiles: bool = False
639    ) -> list:
640    """
641    Recursively traverse a directory and return a list of its contents.
642    """
643    import os, pathlib
644    paths = []
645    _directory = pathlib.Path(directory)
646
647    def _found_pattern(name: str):
648        for pattern in ignore_patterns:
649            if pattern.replace('/', os.path.sep) in str(name):
650                return True
651        return False
652
653    for root, dirs, files in os.walk(_directory):
654        _root = str(root)[len(str(_directory.parent)):]
655        if _root.startswith(os.path.sep):
656            _root = _root[len(os.path.sep):]
657        if _root.startswith('.') and not include_dotfiles:
658            continue
659        ### ignore certain patterns
660        if _found_pattern(_root):
661            continue
662
663        for filename in files:
664            if filename.startswith('.') and not include_dotfiles:
665                continue
666            path = os.path.join(root, filename)
667            if _found_pattern(path):
668                continue
669
670            _path = str(path)[len(str(_directory.parent)):]
671            if _path.startswith(os.path.sep):
672                _path = _path[len(os.path.sep):]
673            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
674
675            paths.append((path, _path))
676    return paths

Recursively traverse a directory and return a list of its contents.

def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
679def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
680    """
681    Recursively replace passwords in a dictionary.
682
683    Parameters
684    ----------
685    d: Dict[str, Any]
686        The dictionary to search through.
687
688    replace_with: str, default '*'
689        The string to replace each character of the password with.
690
691    Returns
692    -------
693    Another dictionary where values to the keys `'password'`
694    are replaced with `replace_with` (`'*'`).
695
696    Examples
697    --------
698    >>> replace_password({'a': 1})
699    {'a': 1}
700    >>> replace_password({'password': '123'})
701    {'password': '***'}
702    >>> replace_password({'nested': {'password': '123'}})
703    {'nested': {'password': '***'}}
704    >>> replace_password({'password': '123'}, replace_with='!')
705    {'password': '!!!'}
706
707    """
708    import copy
709    _d = copy.deepcopy(d)
710    for k, v in d.items():
711        if isinstance(v, dict):
712            _d[k] = replace_password(v)
713        elif 'password' in str(k).lower():
714            _d[k] = ''.join([replace_with for char in str(v)])
715        elif str(k).lower() == 'uri':
716            from meerschaum.connectors.sql import SQLConnector
717            try:
718                uri_params = SQLConnector.parse_uri(v)
719            except Exception as e:
720                uri_params = None
721            if not uri_params:
722                continue
723            if not 'username' in uri_params or not 'password' in uri_params:
724                continue
725            _d[k] = v.replace(
726                uri_params['username'] + ':' + uri_params['password'],
727                uri_params['username'] + ':' + ''.join(
728                    [replace_with for char in str(uri_params['password'])]
729                )
730            )
731    return _d

Recursively replace passwords in a dictionary.

Parameters
  • d (Dict[str, Any]): The dictionary to search through.
  • replace_with (str, default '*'): The string to replace each character of the password with.
Returns
  • Another dictionary where values to the keys 'password'
  • are replaced with replace_with ('*').
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
def filter_arguments( func: Callable[[Any], Any], *args: Any, **kwargs: Any) -> Tuple[Tuple[Any], Dict[str, Any]]:
734def filter_arguments(
735    func: Callable[[Any], Any],
736    *args: Any,
737    **kwargs: Any
738) -> Tuple[Tuple[Any], Dict[str, Any]]:
739    """
740    Filter out unsupported positional and keyword arguments.
741
742    Parameters
743    ----------
744    func: Callable[[Any], Any]
745        The function to inspect.
746
747    *args: Any
748        Positional arguments to filter and pass to `func`.
749
750    **kwargs
751        Keyword arguments to filter and pass to `func`.
752
753    Returns
754    -------
755    The `args` and `kwargs` accepted by `func`.
756    """
757    args = filter_positionals(func, *args)
758    kwargs = filter_keywords(func, **kwargs)
759    return args, kwargs

Filter out unsupported positional and keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): Positional arguments to filter and pass to func.
  • **kwargs: Keyword arguments to filter and pass to func.
Returns
  • The args and kwargs accepted by func.
def filter_keywords(func: Callable[[Any], Any], **kw: Any) -> Dict[str, Any]:
762def filter_keywords(
763    func: Callable[[Any], Any],
764    **kw: Any
765) -> Dict[str, Any]:
766    """
767    Filter out unsupported keyword arguments.
768
769    Parameters
770    ----------
771    func: Callable[[Any], Any]
772        The function to inspect.
773
774    **kw: Any
775        The arguments to be filtered and passed into `func`.
776
777    Returns
778    -------
779    A dictionary of keyword arguments accepted by `func`.
780
781    Examples
782    --------
783    ```python
784    >>> def foo(a=1, b=2):
785    ...     return a * b
786    >>> filter_keywords(foo, a=2, b=4, c=6)
787    {'a': 2, 'b': 4}
788    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
789    8
790    ```
791
792    """
793    import inspect
794    func_params = inspect.signature(func).parameters
795    ### If the function has a **kw method, skip filtering.
796    for param, _type in func_params.items():
797        if '**' in str(_type):
798            return kw
799    return {k: v for k, v in kw.items() if k in func_params}

Filter out unsupported keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • **kw (Any): The arguments to be filtered and passed into func.
Returns
  • A dictionary of keyword arguments accepted by func.
Examples
>>> def foo(a=1, b=2):
...     return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
def filter_positionals(func: Callable[[Any], Any], *args: Any) -> Tuple[Any]:
802def filter_positionals(
803    func: Callable[[Any], Any],
804    *args: Any
805) -> Tuple[Any]:
806    """
807    Filter out unsupported positional arguments.
808
809    Parameters
810    ----------
811    func: Callable[[Any], Any]
812        The function to inspect.
813
814    *args: Any
815        The arguments to be filtered and passed into `func`.
816        NOTE: If the function signature expects more arguments than provided,
817        the missing slots will be filled with `None`.
818
819    Returns
820    -------
821    A tuple of positional arguments accepted by `func`.
822
823    Examples
824    --------
825    ```python
826    >>> def foo(a, b):
827    ...     return a * b
828    >>> filter_positionals(foo, 2, 4, 6)
829    (2, 4)
830    >>> foo(*filter_positionals(foo, 2, 4, 6))
831    8
832    ```
833
834    """
835    import inspect
836    from meerschaum.utils.warnings import warn
837    func_params = inspect.signature(func).parameters
838    acceptable_args: List[Any] = []
839
840    def _warn_invalids(_num_invalid):
841        if _num_invalid > 0:
842            warn(
843                "Too few arguments were provided. "
844                + f"{_num_invalid} argument"
845                + ('s have ' if _num_invalid != 1 else " has ")
846                + " been filled with `None`.",
847            )
848
849    num_invalid: int = 0
850    for i, (param, val) in enumerate(func_params.items()):
851        if '=' in str(val) or '*' in str(val):
852            _warn_invalids(num_invalid)
853            return tuple(acceptable_args)
854
855        try:
856            acceptable_args.append(args[i])
857        except IndexError:
858            acceptable_args.append(None)
859            num_invalid += 1
860
861    _warn_invalids(num_invalid)
862    return tuple(acceptable_args)

Filter out unsupported positional arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): The arguments to be filtered and passed into func. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled with None.
Returns
  • A tuple of positional arguments accepted by func.
Examples
>>> def foo(a, b):
...     return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
865def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
866    """
867    Convert an ordered dict to a dict.
868    Does not mutate the original OrderedDict.
869    """
870    from collections import OrderedDict
871    _d = dict(od)
872    for k, v in od.items():
873        if isinstance(v, OrderedDict) or (
874            issubclass(type(v), OrderedDict)
875        ):
876            _d[k] = dict_from_od(v)
877    return _d

Convert an ordered dict to a dict. Does not mutate the original OrderedDict.

def remove_ansi(s: str) -> str:
879def remove_ansi(s: str) -> str:
880    """
881    Remove ANSI escape characters from a string.
882
883    Parameters
884    ----------
885    s: str:
886        The string to be cleaned.
887
888    Returns
889    -------
890    A string with the ANSI characters removed.
891
892    Examples
893    --------
894    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
895    'Hello, World!'
896
897    """
898    import re
899    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)

Remove ANSI escape characters from a string.

Parameters
  • s (str:): The string to be cleaned.
Returns
  • A string with the ANSI characters removed.
Examples
>>> remove_ansi("Hello, World!")
'Hello, World!'
def get_connector_labels( *types: str, search_term: str = '', ignore_exact_match=True, _additional_options: Optional[List[str]] = None) -> List[str]:
902def get_connector_labels(
903    *types: str,
904    search_term: str = '',
905    ignore_exact_match = True,
906    _additional_options: Optional[List[str]] = None,
907) -> List[str]:
908    """
909    Read connector labels from the configuration dictionary.
910
911    Parameters
912    ----------
913    *types: str
914        The connector types.
915        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
916
917    search_term: str, default ''
918        A filter on the connectors' labels.
919
920    ignore_exact_match: bool, default True
921        If `True`, skip a connector if the search_term is an exact match.
922
923    Returns
924    -------
925    A list of the keys of defined connectors.
926
927    """
928    from meerschaum.config import get_config
929    connectors = get_config('meerschaum', 'connectors')
930
931    _types = list(types)
932    if len(_types) == 0:
933        _types = list(connectors.keys()) + ['plugin']
934
935    conns = []
936    for t in _types:
937        if t == 'plugin':
938            from meerschaum.plugins import get_data_plugins
939            conns += [
940                f'{t}:' + plugin.module.__name__.split('.')[-1]
941                for plugin in get_data_plugins()
942            ]
943            continue
944        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
945
946    if _additional_options:
947        conns += _additional_options
948
949    possibilities = [
950        c
951        for c in conns
952        if c.startswith(search_term)
953            and c != (
954                search_term if ignore_exact_match else ''
955            )
956    ]
957    return sorted(possibilities)

Read connector labels from the configuration dictionary.

Parameters
  • *types (str): The connector types. If none are provided, use the defined types ('sql' and 'api') and 'plugin'.
  • search_term (str, default ''): A filter on the connectors' labels.
  • ignore_exact_match (bool, default True): If True, skip a connector if the search_term is an exact match.
Returns
  • A list of the keys of defined connectors.
def json_serialize_datetime(dt: datetime.datetime) -> Optional[str]:
960def json_serialize_datetime(dt: datetime) -> Union[str, None]:
961    """
962    Serialize a datetime object into JSON (ISO format string).
963
964    Examples
965    --------
966    >>> import json
967    >>> from datetime import datetime
968    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
969    '{"a": "2022-01-01T00:00:00Z"}'
970
971    """
972    if not isinstance(dt, datetime):
973        return None
974    tz_suffix = 'Z' if dt.tzinfo is None else ''
975    return dt.isoformat() + tz_suffix

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
def wget( url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, color: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
 978def wget(
 979    url: str,
 980    dest: Optional[Union[str, 'pathlib.Path']] = None,
 981    headers: Optional[Dict[str, Any]] = None,
 982    color: bool = True,
 983    debug: bool = False,
 984    **kw: Any
 985) -> 'pathlib.Path':
 986    """
 987    Mimic `wget` with `requests`.
 988
 989    Parameters
 990    ----------
 991    url: str
 992        The URL to the resource to be downloaded.
 993
 994    dest: Optional[Union[str, pathlib.Path]], default None
 995        The destination path of the downloaded file.
 996        If `None`, save to the current directory.
 997
 998    color: bool, default True
 999        If `debug` is `True`, print color output.
1000
1001    debug: bool, default False
1002        Verbosity toggle.
1003
1004    Returns
1005    -------
1006    The path to the downloaded file.
1007
1008    """
1009    from meerschaum.utils.warnings import warn, error
1010    from meerschaum.utils.debug import dprint
1011    import os, pathlib, re, urllib.request
1012    if headers is None:
1013        headers = {}
1014    request = urllib.request.Request(url, headers=headers)
1015    if not color:
1016        dprint = print
1017    if debug:
1018        dprint(f"Downloading from '{url}'...")
1019    try:
1020        response = urllib.request.urlopen(request)
1021    except Exception as e:
1022        import ssl
1023        ssl._create_default_https_context = ssl._create_unverified_context
1024        try:
1025            response = urllib.request.urlopen(request)
1026        except Exception as _e:
1027            print(_e)
1028            response = None
1029    if response is None or response.code != 200:
1030        error_msg = f"Failed to download from '{url}'."
1031        if color:
1032            error(error_msg)
1033        else:
1034            print(error_msg)
1035            import sys
1036            sys.exit(1)
1037
1038    d = response.headers.get('content-disposition', None)
1039    fname = (
1040        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1041        else url.split('/')[-1]
1042    )
1043
1044    if dest is None:
1045        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1046    elif isinstance(dest, str):
1047        dest = pathlib.Path(dest)
1048
1049    with open(dest, 'wb') as f:
1050        f.write(response.fp.read())
1051
1052    if debug:
1053        dprint(f"Downloaded file '{dest}'.")
1054
1055    return dest

Mimic wget with requests.

Parameters
  • url (str): The URL to the resource to be downloaded.
  • dest (Optional[Union[str, pathlib.Path]], default None): The destination path of the downloaded file. If None, save to the current directory.
  • color (bool, default True): If debug is True, print color output.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The path to the downloaded file.
def async_wrap(func):
1058def async_wrap(func):
1059    """
1060    Run a synchronous function as async.
1061    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1062    """
1063    import asyncio
1064    from functools import wraps, partial
1065
1066    @wraps(func)
1067    async def run(*args, loop=None, executor=None, **kwargs):
1068        if loop is None:
1069            loop = asyncio.get_event_loop()
1070        pfunc = partial(func, *args, **kwargs)
1071        return await loop.run_in_executor(executor, pfunc)
1072    return run
def debug_trace(browser: bool = True):
1075def debug_trace(browser: bool = True):
1076    """
1077    Open a web-based debugger to trace the execution of the program.
1078
1079    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1080    """
1081    from meerschaum.utils.debug import trace
1082    trace(browser=browser)

Open a web-based debugger to trace the execution of the program.

This is an alias import for meerschaum.utils.debug.debug_trace.

def items_str( items: List[Any], quotes: bool = True, quote_str: str = "'", commas: bool = True, comma_str: str = ',', and_: bool = True, and_str: str = 'and', oxford_comma: bool = True, spaces: bool = True, space_str=' ') -> str:
1085def items_str(
1086    items: List[Any],
1087    quotes: bool = True,
1088    quote_str: str = "'",
1089    commas: bool = True,
1090    comma_str: str = ',',
1091    and_: bool = True,
1092    and_str: str = 'and',
1093    oxford_comma: bool = True,
1094    spaces: bool = True,
1095    space_str = ' ',
1096) -> str:
1097    """
1098    Return a formatted string if list items separated by commas.
1099
1100    Parameters
1101    ----------
1102    items: [List[Any]]
1103        The items to be printed as an English list.
1104
1105    quotes: bool, default True
1106        If `True`, wrap items in quotes.
1107
1108    quote_str: str, default "'"
1109        If `quotes` is `True`, prepend and append each item with this string.
1110
1111    and_: bool, default True
1112        If `True`, include the word 'and' before the final item in the list.
1113
1114    and_str: str, default 'and'
1115        If `and_` is True, insert this string where 'and' normally would in and English list.
1116
1117    oxford_comma: bool, default True
1118        If `True`, include the Oxford Comma (comma before the final 'and').
1119        Only applies when `and_` is `True`.
1120
1121    spaces: bool, default True
1122        If `True`, separate items with `space_str`
1123
1124    space_str: str, default ' '
1125        If `spaces` is `True`, separate items with this string.
1126
1127    Returns
1128    -------
1129    A string of the items as an English list.
1130
1131    Examples
1132    --------
1133    >>> items_str([1,2,3])
1134    "'1', '2', and '3'"
1135    >>> items_str([1,2,3], quotes=False)
1136    '1, 2, and 3'
1137    >>> items_str([1,2,3], and_=False)
1138    "'1', '2', '3'"
1139    >>> items_str([1,2,3], spaces=False, and_=False)
1140    "'1','2','3'"
1141    >>> items_str([1,2,3], oxford_comma=False)
1142    "'1', '2' and '3'"
1143    >>> items_str([1,2,3], quote_str=":")
1144    ':1:, :2:, and :3:'
1145    >>> items_str([1,2,3], and_str="or")
1146    "'1', '2', or '3'"
1147    >>> items_str([1,2,3], space_str="_")
1148    "'1',_'2',_and_'3'"
1149
1150    """
1151    if not items:
1152        return ''
1153    
1154    q = quote_str if quotes else ''
1155    s = space_str if spaces else ''
1156    a = and_str if and_ else ''
1157    c = comma_str if commas else ''
1158
1159    if len(items) == 1:
1160        return q + str(list(items)[0]) + q
1161
1162    if len(items) == 2:
1163        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1164
1165    sep = q + c + s + q
1166    output = q + sep.join(str(i) for i in items[:-1]) + q
1167    if oxford_comma:
1168        output += c
1169    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1170    return output

Return a formatted string if list items separated by commas.

Parameters
  • items ([List[Any]]): The items to be printed as an English list.
  • quotes (bool, default True): If True, wrap items in quotes.
  • quote_str (str, default "'"): If quotes is True, prepend and append each item with this string.
  • and_ (bool, default True): If True, include the word 'and' before the final item in the list.
  • and_str (str, default 'and'): If and_ is True, insert this string where 'and' normally would in and English list.
  • oxford_comma (bool, default True): If True, include the Oxford Comma (comma before the final 'and'). Only applies when and_ is True.
  • spaces (bool, default True): If True, separate items with space_str
  • space_str (str, default ' '): If spaces is True, separate items with this string.
Returns
  • A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
def interval_str(delta: Union[datetime.timedelta, int]) -> str:
1173def interval_str(delta: Union[timedelta, int]) -> str:
1174    """
1175    Return a human-readable string for a `timedelta` (or `int` minutes).
1176
1177    Parameters
1178    ----------
1179    delta: Union[timedelta, int]
1180        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1181
1182    Returns
1183    -------
1184    A formatted string, fit for human eyes.
1185    """
1186    from meerschaum.utils.packages import attempt_import
1187    humanfriendly = attempt_import('humanfriendly')
1188    delta_seconds = (
1189        delta.total_seconds()
1190        if isinstance(delta, timedelta)
1191        else (delta * 60)
1192    )
1193    return humanfriendly.format_timespan(delta_seconds)

Return a human-readable string for a timedelta (or int minutes).

Parameters
  • delta (Union[timedelta, int]): The interval to print. If delta is an integer, assume it corresponds to minutes.
Returns
  • A formatted string, fit for human eyes.
def is_docker_available() -> bool:
1196def is_docker_available() -> bool:
1197    """Check if we can connect to the Docker engine."""
1198    import subprocess
1199    try:
1200        has_docker = subprocess.call(
1201            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1202        ) == 0
1203    except Exception as e:
1204        has_docker = False
1205    return has_docker

Check if we can connect to the Docker engine.

def is_android() -> bool:
1208def is_android() -> bool:
1209    """Return `True` if the current platform is Android."""
1210    import sys
1211    return hasattr(sys, 'getandroidapilevel')

Return True if the current platform is Android.

def is_bcp_available() -> bool:
1214def is_bcp_available() -> bool:
1215    """Check if the MSSQL `bcp` utility is installed."""
1216    import subprocess
1217
1218    try:
1219        has_bcp = subprocess.call(
1220            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1221        ) == 0
1222    except Exception as e:
1223        has_bcp = False
1224    return has_bcp

Check if the MSSQL bcp utility is installed.

def is_systemd_available() -> bool:
1227def is_systemd_available() -> bool:
1228    """Check if running on systemd."""
1229    import subprocess
1230    try:
1231        has_systemctl = subprocess.call(
1232            ['systemctl', '-h'],
1233            stdout=subprocess.DEVNULL,
1234            stderr=subprocess.STDOUT,
1235        ) == 0
1236    except Exception:
1237        has_systemctl = False
1238    return has_systemctl

Check if running on systemd.

def get_last_n_lines(file_name: str, N: int):
1240def get_last_n_lines(file_name: str, N: int):
1241    """
1242    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1243    """
1244    import os
1245    # Create an empty list to keep the track of last N lines
1246    list_of_lines = []
1247    # Open file for reading in binary mode
1248    with open(file_name, 'rb') as read_obj:
1249        # Move the cursor to the end of the file
1250        read_obj.seek(0, os.SEEK_END)
1251        # Create a buffer to keep the last read line
1252        buffer = bytearray()
1253        # Get the current position of pointer i.e eof
1254        pointer_location = read_obj.tell()
1255        # Loop till pointer reaches the top of the file
1256        while pointer_location >= 0:
1257            # Move the file pointer to the location pointed by pointer_location
1258            read_obj.seek(pointer_location)
1259            # Shift pointer location by -1
1260            pointer_location = pointer_location -1
1261            # read that byte / character
1262            new_byte = read_obj.read(1)
1263            # If the read byte is new line character then it means one line is read
1264            if new_byte == b'\n':
1265                # Save the line in list of lines
1266                list_of_lines.append(buffer.decode()[::-1])
1267                # If the size of list reaches N, then return the reversed list
1268                if len(list_of_lines) == N:
1269                    return list(reversed(list_of_lines))
1270                # Reinitialize the byte array to save next line
1271                buffer = bytearray()
1272            else:
1273                # If last read character is not eol then add it in buffer
1274                buffer.extend(new_byte)
1275        # As file is read completely, if there is still data in buffer, then its first line.
1276        if len(buffer) > 0:
1277            list_of_lines.append(buffer.decode()[::-1])
1278    # return the reversed list
1279    return list(reversed(list_of_lines))
def tail(f, n, offset=None):
1282def tail(f, n, offset=None):
1283    """
1284    https://stackoverflow.com/a/692616/9699829
1285    
1286    Reads n lines from f with an offset of offset lines.  The return
1287    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1288    an indicator that is `True` if there are more lines in the file.
1289    """
1290    avg_line_length = 74
1291    to_read = n + (offset or 0)
1292
1293    while True:
1294        try:
1295            f.seek(-(avg_line_length * to_read), 2)
1296        except IOError:
1297            # woops.  apparently file is smaller than what we want
1298            # to step back, go to the beginning instead
1299            f.seek(0)
1300        pos = f.tell()
1301        lines = f.read().splitlines()
1302        if len(lines) >= to_read or pos == 0:
1303            return lines[-to_read:offset and -offset or None], \
1304                   len(lines) > to_read or pos > 0
1305        avg_line_length *= 1.3

https://stackoverflow.com/a/692616/9699829

Reads n lines from f with an offset of offset lines. The return value is a tuple in the form (lines, has_more) where has_more is an indicator that is True if there are more lines in the file.

def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1308def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1309    """
1310    Remove characters from each section of a string until the length is within the limit.
1311
1312    Parameters
1313    ----------
1314    item: str
1315        The item name to be truncated.
1316
1317    delimeter: str, default '_'
1318        Split `item` by this string into several sections.
1319
1320    max_len: int, default 128
1321        The max acceptable length of the truncated version of `item`.
1322
1323    Returns
1324    -------
1325    The truncated string.
1326
1327    Examples
1328    --------
1329    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1330    'ab_de_gh'
1331
1332    """
1333    if len(item) < max_len:
1334        return item
1335
1336    def _shorten(s: str) -> str:
1337        return s[:-1] if len(s) > 1 else s
1338
1339    sections = list(enumerate(item.split('_')))
1340    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1341    available_chars = max_len - len(sections)
1342
1343    _sections = [(i, s) for i, s in sorted_sections]
1344    _sections_len = sum([len(s) for i, s in _sections])
1345    _old_sections_len = _sections_len
1346    while _sections_len > available_chars:
1347        _sections = [(i, _shorten(s)) for i, s in _sections]
1348        _old_sections_len = _sections_len
1349        _sections_len = sum([len(s) for i, s in _sections])
1350        if _old_sections_len == _sections_len:
1351            raise Exception(f"String could not be truncated: '{item}'")
1352
1353    new_sections = sorted(_sections, key=lambda x: x[0])
1354    return delimeter.join([s for i, s in new_sections])

Remove characters from each section of a string until the length is within the limit.

Parameters
  • item (str): The item name to be truncated.
  • delimeter (str, default '_'): Split item by this string into several sections.
  • max_len (int, default 128): The max acceptable length of the truncated version of item.
Returns
  • The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
def separate_negation_values( vals: Union[List[str], Tuple[str]], negation_prefix: Optional[str] = None) -> Tuple[List[str], List[str]]:
1357def separate_negation_values(
1358    vals: Union[List[str], Tuple[str]],
1359    negation_prefix: Optional[str] = None,
1360) -> Tuple[List[str], List[str]]:
1361    """
1362    Separate the negated values from the positive ones.
1363    Return two lists: positive and negative values.
1364
1365    Parameters
1366    ----------
1367    vals: Union[List[str], Tuple[str]]
1368        A list of strings to parse.
1369
1370    negation_prefix: Optional[str], default None
1371        Include values that start with this string in the second list.
1372        If `None`, use the system default (`_`).
1373    """
1374    if negation_prefix is None:
1375        from meerschaum.config.static import STATIC_CONFIG
1376        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1377    _in_vals, _ex_vals = [], []
1378    for v in vals:
1379        if str(v).startswith(negation_prefix):
1380            _ex_vals.append(str(v)[len(negation_prefix):])
1381        else:
1382            _in_vals.append(v)
1383
1384    return _in_vals, _ex_vals

Separate the negated values from the positive ones. Return two lists: positive and negative values.

Parameters
  • vals (Union[List[str], Tuple[str]]): A list of strings to parse.
  • negation_prefix (Optional[str], default None): Include values that start with this string in the second list. If None, use the system default (_).
def get_in_ex_params( params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1387def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1388    """
1389    Translate a params dictionary into lists of include- and exclude-values.
1390
1391    Parameters
1392    ----------
1393    params: Optional[Dict[str, Any]]
1394        A params query dictionary.
1395
1396    Returns
1397    -------
1398    A dictionary mapping keys to a tuple of lists for include and exclude values.
1399
1400    Examples
1401    --------
1402    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1403    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1404    """
1405    if not params:
1406        return {}
1407    return {
1408        col: separate_negation_values(
1409            (
1410                val
1411                if isinstance(val, (list, tuple))
1412                else [val]
1413            )
1414        )
1415        for col, val in params.items()
1416    }

Translate a params dictionary into lists of include- and exclude-values.

Parameters
  • params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
  • A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
def flatten_list(list_: List[Any]) -> List[Any]:
1419def flatten_list(list_: List[Any]) -> List[Any]:
1420    """
1421    Recursively flatten a list.
1422    """
1423    for item in list_:
1424        if isinstance(item, list):
1425            yield from flatten_list(item)
1426        else:
1427            yield item

Recursively flatten a list.

def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1430def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]:
1431    """
1432    Parse a string containing the text to be passed into a function
1433    and return a tuple of args, kwargs.
1434
1435    Parameters
1436    ----------
1437    args_str: str
1438        The contents of the function parameter (as a string).
1439
1440    Returns
1441    -------
1442    A tuple of args (tuple) and kwargs (dict[str, Any]).
1443
1444    Examples
1445    --------
1446    >>> parse_arguments_str('123, 456, foo=789, bar="baz"')
1447    (123, 456), {'foo': 789, 'bar': 'baz'}
1448    """
1449    import ast
1450    args = []
1451    kwargs = {}
1452
1453    for part in args_str.split(','):
1454        if '=' in part:
1455            key, val = part.split('=', 1)
1456            kwargs[key.strip()] = ast.literal_eval(val)
1457        else:
1458            args.append(ast.literal_eval(part.strip()))
1459
1460    return tuple(args), kwargs

Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.

Parameters
  • args_str (str): The contents of the function parameter (as a string).
Returns
  • A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
def parametrized(dec):
1533def parametrized(dec):
1534    """
1535    A meta-decorator for allowing other decorator functions to have parameters.
1536
1537    https://stackoverflow.com/a/26151604/9699829
1538    """
1539    def layer(*args, **kwargs):
1540        def repl(f):
1541            return dec(f, *args, **kwargs)
1542        return repl
1543    return layer

A meta-decorator for allowing other decorator functions to have parameters.

https://stackoverflow.com/a/26151604/9699829

def safely_extract_tar(tarf: "'file'", output_dir: Union[str, pathlib.Path]) -> None:
1546def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1547    """
1548    Safely extract a TAR file to a give directory.
1549    This defends against CVE-2007-4559.
1550
1551    Parameters
1552    ----------
1553    tarf: file
1554        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1555
1556    output_dir: Union[str, pathlib.Path]
1557        The output directory.
1558    """
1559    import os
1560
1561    def is_within_directory(directory, target):
1562        abs_directory = os.path.abspath(directory)
1563        abs_target = os.path.abspath(target)
1564        prefix = os.path.commonprefix([abs_directory, abs_target])
1565        return prefix == abs_directory 
1566
1567    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1568        for member in tar.getmembers():
1569            member_path = os.path.join(path, member.name)
1570            if not is_within_directory(path, member_path):
1571                raise Exception("Attempted Path Traversal in Tar File")
1572
1573        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1574
1575    return safe_extract(tarf, output_dir)

Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.

Parameters
  • tarf (file): The TAR file opened with tarfile.open(path, 'r:gz').
  • output_dir (Union[str, pathlib.Path]): The output directory.
def choose_subaction(*args, **kwargs) -> Any:
1582def choose_subaction(*args, **kwargs) -> Any:
1583    """
1584    Placeholder function to prevent breaking legacy behavior.
1585    See `meerschaum.actions.choose_subaction`.
1586    """
1587    from meerschaum.actions import choose_subaction as _choose_subactions
1588    return _choose_subactions(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.actions.choose_subaction.