meerschaum.utils.misc

Miscellaneous functions go here

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4"""
   5Miscellaneous functions go here
   6"""
   7
   8from __future__ import annotations
   9from datetime import timedelta, datetime, timezone
  10from meerschaum.utils.typing import (
  11    Union,
  12    Any,
  13    Callable,
  14    Optional,
  15    List,
  16    Dict,
  17    SuccessTuple,
  18    Iterable,
  19    PipesDict,
  20    Tuple,
  21    InstanceConnector,
  22    Hashable,
  23    Generator,
  24    Iterator,
  25)
  26import meerschaum as mrsm
  27
  28__pdoc__: Dict[str, bool] = {
  29    'to_pandas_dtype': False,
  30    'filter_unseen_df': False,
  31    'add_missing_cols_to_df': False,
  32    'parse_df_datetimes': False,
  33    'df_from_literal': False,
  34    'get_json_cols': False,
  35    'get_unhashable_cols': False,
  36    'enforce_dtypes': False,
  37    'get_datetime_bound_from_df': False,
  38    'df_is_chunk_generator': False,
  39    'choices_docstring': False,
  40    '_get_subaction_names': False,
  41}
  42
  43
  44def add_method_to_class(
  45        func: Callable[[Any], Any],
  46        class_def: 'Class',
  47        method_name: Optional[str] = None,
  48        keep_self: Optional[bool] = None,
  49    ) -> Callable[[Any], Any]:
  50    """
  51    Add function `func` to class `class_def`.
  52
  53    Parameters
  54    ----------
  55    func: Callable[[Any], Any]
  56        Function to be added as a method of the class
  57
  58    class_def: Class
  59        Class to be modified.
  60
  61    method_name: Optional[str], default None
  62        New name of the method. None will use func.__name__ (default).
  63
  64    Returns
  65    -------
  66    The modified function object.
  67
  68    """
  69    from functools import wraps
  70
  71    is_class = isinstance(class_def, type)
  72    
  73    @wraps(func)
  74    def wrapper(self, *args, **kw):
  75        return func(*args, **kw)
  76
  77    if method_name is None:
  78        method_name = func.__name__
  79
  80    setattr(class_def, method_name, (
  81            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
  82        )
  83    )
  84
  85    return func
  86
  87
  88def generate_password(length: int = 12) -> str:
  89    """Generate a secure password of given length.
  90
  91    Parameters
  92    ----------
  93    length : int, default 12
  94        The length of the password.
  95
  96    Returns
  97    -------
  98    A random password string.
  99
 100    """
 101    import secrets, string
 102    return ''.join((secrets.choice(string.ascii_letters) for i in range(length)))
 103
 104def is_int(s : str) -> bool:
 105    """
 106    Check if string is an int.
 107
 108    Parameters
 109    ----------
 110    s: str
 111        The string to be checked.
 112        
 113    Returns
 114    -------
 115    A bool indicating whether the string was able to be cast to an integer.
 116
 117    """
 118    try:
 119        float(s)
 120    except Exception as e:
 121        return False
 122    
 123    return float(s).is_integer()
 124
 125
 126def string_to_dict(
 127        params_string: str
 128    ) -> Dict[str, Any]:
 129    """
 130    Parse a string into a dictionary.
 131    
 132    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
 133
 134    Parameters
 135    ----------
 136    params_string: str
 137        The string to be parsed.
 138        
 139    Returns
 140    -------
 141    The parsed dictionary.
 142
 143    Examples
 144    --------
 145    >>> string_to_dict("a:1,b:2") 
 146    {'a': 1, 'b': 2}
 147    >>> string_to_dict('{"a": 1, "b": 2}')
 148    {'a': 1, 'b': 2}
 149
 150    """
 151    if params_string == "":
 152        return {}
 153
 154    import json
 155
 156    ### Kind of a weird edge case.
 157    ### In the generated compose file, there is some weird escaping happening,
 158    ### so the string to be parsed starts and ends with a single quote.
 159    if (
 160        isinstance(params_string, str)
 161        and len(params_string) > 4
 162        and params_string[1] == "{"
 163        and params_string[-2] == "}"
 164    ):
 165        return json.loads(params_string[1:-1])
 166    if str(params_string).startswith('{'):
 167        return json.loads(params_string)
 168
 169    import ast
 170    params_dict = {}
 171    for param in params_string.split(","):
 172        _keys = param.split(":")
 173        keys = _keys[:-1]
 174        try:
 175            val = ast.literal_eval(_keys[-1])
 176        except Exception as e:
 177            val = str(_keys[-1])
 178
 179        c = params_dict
 180        for _k in keys[:-1]:
 181            try:
 182                k = ast.literal_eval(_k)
 183            except Exception as e:
 184                k = str(_k)
 185            if k not in c:
 186                c[k] = {}
 187            c = c[k]
 188
 189        c[keys[-1]] = val
 190
 191    return params_dict
 192
 193
 194def parse_config_substitution(
 195        value: str,
 196        leading_key: str = 'MRSM',
 197        begin_key: str = '{',
 198        end_key: str = '}',
 199        delimeter: str = ':'
 200    ) -> List[Any]:
 201    """
 202    Parse Meerschaum substitution syntax
 203    E.g. MRSM{value1:value2} => ['value1', 'value2']
 204    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
 205    """
 206    if not value.beginswith(leading_key):
 207        return value
 208
 209    return leading_key[len(leading_key):][len():-1].split(delimeter)
 210
 211def edit_file(
 212        path: Union[pathlib.Path, str],
 213        default_editor: str = 'pyvim',
 214        debug: bool = False
 215    ) -> bool:
 216    """
 217    Open a file for editing.
 218    
 219    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
 220
 221    Parameters
 222    ----------
 223    path: Union[pathlib.Path, str]
 224        The path to the file to be edited.
 225        
 226    default_editor: str, default 'pyvim'
 227        If `$EDITOR` is not set, use this instead.
 228        If `pyvim` is not installed, it will install it from PyPI.
 229
 230    debug: bool, default False
 231        Verbosity toggle.
 232
 233    Returns
 234    -------
 235    A bool indicating the file was successfully edited.
 236
 237    """
 238    import os
 239    from subprocess import call
 240    from meerschaum.utils.debug import dprint
 241    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
 242    try:
 243        EDITOR = os.environ.get('EDITOR', default_editor)
 244        if debug:
 245            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
 246        rc = call([EDITOR, path])
 247    except Exception as e: ### can't open with default editors
 248        if debug:
 249            dprint(e)
 250            dprint('Failed to open file with system editor. Falling back to pyvim...')
 251        pyvim = attempt_import('pyvim', lazy=False)
 252        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
 253    return rc == 0
 254
 255
 256def is_pipe_registered(
 257        pipe: 'meerschaum.Pipe',
 258        pipes: PipesDict,
 259        debug: bool = False
 260    ) -> bool:
 261    """
 262    Check if a Pipe is inside the pipes dictionary.
 263
 264    Parameters
 265    ----------
 266    pipe: meerschaum.Pipe
 267        The pipe to see if it's in the dictionary.
 268        
 269    pipes: PipesDict
 270        The dictionary to search inside.
 271        
 272    debug: bool, default False
 273        Verbosity toggle.
 274
 275    Returns
 276    -------
 277    A bool indicating whether the pipe is inside the dictionary.
 278    """
 279    from meerschaum.utils.debug import dprint
 280    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
 281    if debug:
 282        dprint(f'{ck}, {mk}, {lk}')
 283        dprint(f'{pipe}, {pipes}')
 284    return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]
 285
 286
 287def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
 288    """
 289    Determine the columns and lines in the terminal.
 290    If they cannot be determined, return the default values (100 columns and 120 lines).
 291
 292    Parameters
 293    ----------
 294    default_cols: int, default 100
 295        If the columns cannot be determined, return this value.
 296
 297    default_lines: int, default 120
 298        If the lines cannot be determined, return this value.
 299
 300    Returns
 301    -------
 302    A tuple if integers for the columns and lines.
 303    """
 304    import os
 305    try:
 306        size = os.get_terminal_size()
 307        _cols, _lines = size.columns, size.lines
 308    except Exception as e:
 309        _cols, _lines = (
 310            int(os.environ.get('COLUMNS', str(default_cols))),
 311            int(os.environ.get('LINES', str(default_lines))),
 312        )
 313    return _cols, _lines
 314
 315
 316def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
 317    """
 318    Iterate over a list in chunks.
 319    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
 320
 321    Parameters
 322    ----------
 323    iterable: Iterable[Any]
 324        The iterable to iterate over in chunks.
 325        
 326    chunksize: int
 327        The size of chunks to iterate with.
 328        
 329    fillvalue: Optional[Any], default None
 330        If the chunks do not evenly divide into the iterable, pad the end with this value.
 331
 332    Returns
 333    -------
 334    A generator of tuples of size `chunksize`.
 335
 336    """
 337    from itertools import zip_longest
 338    args = [iter(iterable)] * chunksize
 339    return zip_longest(*args, fillvalue=fillvalue)
 340
 341def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
 342    """
 343    Sort a dictionary's values and return a new dictionary.
 344
 345    Parameters
 346    ----------
 347    d: Dict[Any, Any]
 348        The dictionary to be sorted.
 349
 350    Returns
 351    -------
 352    A sorted dictionary.
 353
 354    Examples
 355    --------
 356    >>> sorted_dict({'b': 1, 'a': 2})
 357    {'b': 1, 'a': 2}
 358    >>> sorted_dict({'b': 2, 'a': 1})
 359    {'a': 1, 'b': 2}
 360
 361    """
 362    try:
 363        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
 364    except Exception as e:
 365        return d
 366
 367def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
 368    """
 369    Convert the standard pipes dictionary into a list.
 370
 371    Parameters
 372    ----------
 373    pipes_dict: PipesDict
 374        The pipes dictionary to be flattened.
 375
 376    Returns
 377    -------
 378    A list of `Pipe` objects.
 379
 380    """
 381    pipes_list = []
 382    for ck in pipes_dict.values():
 383        for mk in ck.values():
 384            pipes_list += list(mk.values())
 385    return pipes_list
 386
 387
 388def round_time(
 389        dt: Optional[datetime] = None,
 390        date_delta: Optional[timedelta] = None,
 391        to: 'str' = 'down'
 392    ) -> datetime:
 393    """
 394    Round a datetime object to a multiple of a timedelta.
 395    http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
 396
 397    NOTE: This function strips timezone information!
 398
 399    Parameters
 400    ----------
 401    dt: Optional[datetime], default None
 402        If `None`, grab the current UTC datetime.
 403
 404    date_delta: Optional[timedelta], default None
 405        If `None`, use a delta of 1 minute.
 406
 407    to: 'str', default 'down'
 408        Available options are `'up'`, `'down'`, and `'closest'`.
 409
 410    Returns
 411    -------
 412    A rounded `datetime` object.
 413
 414    Examples
 415    --------
 416    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
 417    datetime.datetime(2022, 1, 1, 12, 15)
 418    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
 419    datetime.datetime(2022, 1, 1, 12, 16)
 420    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
 421    datetime.datetime(2022, 1, 1, 12, 0)
 422    >>> round_time(
 423    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
 424    ...   timedelta(hours=1),
 425    ...   to = 'closest'
 426    ... )
 427    datetime.datetime(2022, 1, 1, 12, 0)
 428    >>> round_time(
 429    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
 430    ...   datetime.timedelta(hours=1),
 431    ...   to = 'closest'
 432    ... )
 433    datetime.datetime(2022, 1, 1, 13, 0)
 434
 435    """
 436    if date_delta is None:
 437        date_delta = timedelta(minutes=1)
 438    round_to = date_delta.total_seconds()
 439    if dt is None:
 440        dt = datetime.now(timezone.utc).replace(tzinfo=None)
 441    seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds
 442
 443    if seconds % round_to == 0 and dt.microsecond == 0:
 444        rounding = (seconds + round_to / 2) // round_to * round_to
 445    else:
 446        if to == 'up':
 447            rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to
 448        elif to == 'down':
 449            rounding = seconds // round_to * round_to
 450        else:
 451            rounding = (seconds + round_to / 2) // round_to * round_to
 452
 453    return dt + timedelta(0, rounding - seconds, - dt.microsecond)
 454
 455
 456def timed_input(
 457        seconds: int = 10,
 458        timeout_message: str = "",
 459        prompt: str = "",
 460        icon: bool = False,
 461        **kw
 462    ) -> Union[str, None]:
 463    """
 464    Accept user input only for a brief period of time.
 465
 466    Parameters
 467    ----------
 468    seconds: int, default 10
 469        The number of seconds to wait.
 470
 471    timeout_message: str, default ''
 472        The message to print after the window has elapsed.
 473
 474    prompt: str, default ''
 475        The prompt to print during the window.
 476
 477    icon: bool, default False
 478        If `True`, print the configured input icon.
 479
 480
 481    Returns
 482    -------
 483    The input string entered by the user.
 484
 485    """
 486    import signal, time
 487
 488    class TimeoutExpired(Exception):
 489        """Raise this exception when the timeout is reached."""
 490
 491    def alarm_handler(signum, frame):
 492        raise TimeoutExpired
 493
 494    # set signal handler
 495    signal.signal(signal.SIGALRM, alarm_handler)
 496    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
 497
 498    try:
 499        return input(prompt)
 500    except TimeoutExpired:
 501        return None
 502    except (EOFError, RuntimeError):
 503        try:
 504            print(prompt)
 505            time.sleep(seconds)
 506        except TimeoutExpired:
 507            return None
 508    finally:
 509        signal.alarm(0) # cancel alarm
 510
 511
 512
 513
 514
 515def replace_pipes_in_dict(
 516        pipes : Optional[PipesDict] = None,
 517        func: 'function' = str,
 518        debug: bool = False,
 519        **kw
 520    ) -> PipesDict:
 521    """
 522    Replace the Pipes in a Pipes dict with the result of another function.
 523
 524    Parameters
 525    ----------
 526    pipes: Optional[PipesDict], default None
 527        The pipes dict to be processed.
 528
 529    func: Callable[[Any], Any], default str
 530        The function to be applied to every pipe.
 531        Defaults to the string constructor.
 532
 533    debug: bool, default False
 534        Verbosity toggle.
 535    
 536
 537    Returns
 538    -------
 539    A dictionary where every pipe is replaced with the output of a function.
 540
 541    """
 542    import copy
 543    def change_dict(d : Dict[Any, Any], func : 'function') -> None:
 544        for k, v in d.items():
 545            if isinstance(v, dict):
 546                change_dict(v, func)
 547            else:
 548                d[k] = func(v)
 549
 550    if pipes is None:
 551        from meerschaum import get_pipes
 552        pipes = get_pipes(debug=debug, **kw)
 553
 554    result = copy.deepcopy(pipes)
 555    change_dict(result, func)
 556    return result
 557
 558def enforce_gevent_monkey_patch():
 559    """
 560    Check if gevent monkey patching is enabled, and if not, then apply patching.
 561    """
 562    from meerschaum.utils.packages import attempt_import
 563    import socket
 564    gevent, gevent_socket, gevent_monkey = attempt_import(
 565        'gevent', 'gevent.socket', 'gevent.monkey'
 566    )
 567    if not socket.socket is gevent_socket.socket:
 568        gevent_monkey.patch_all()
 569
 570def is_valid_email(email: str) -> Union['re.Match', None]:
 571    """
 572    Check whether a string is a valid email.
 573
 574    Parameters
 575    ----------
 576    email: str
 577        The string to be examined.
 578        
 579    Returns
 580    -------
 581    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
 582
 583    Examples
 584    --------
 585    >>> is_valid_email('foo')
 586    >>> is_valid_email('foo@foo.com')
 587    <re.Match object; span=(0, 11), match='foo@foo.com'>
 588
 589    """
 590    import re
 591    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
 592    return re.search(regex, email)
 593
 594
 595def string_width(string: str, widest: bool = True) -> int:
 596    """
 597    Calculate the width of a string, either by its widest or last line.
 598
 599    Parameters
 600    ----------
 601    string: str:
 602        The string to be examined.
 603        
 604    widest: bool, default True
 605        No longer used because `widest` is always assumed to be true.
 606
 607    Returns
 608    -------
 609    An integer for the text's visual width.
 610
 611    Examples
 612    --------
 613    >>> string_width('a')
 614    1
 615    >>> string_width('a\\nbc\\nd')
 616    2
 617
 618    """
 619    def _widest():
 620        words = string.split('\n')
 621        max_length = 0
 622        for w in words:
 623            length = len(w)
 624            if length > max_length:
 625                max_length = length
 626        return max_length
 627
 628    return _widest()
 629
 630def _pyinstaller_traverse_dir(
 631        directory: str,
 632        ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
 633        include_dotfiles: bool = False
 634    ) -> list:
 635    """
 636    Recursively traverse a directory and return a list of its contents.
 637    """
 638    import os, pathlib
 639    paths = []
 640    _directory = pathlib.Path(directory)
 641
 642    def _found_pattern(name: str):
 643        for pattern in ignore_patterns:
 644            if pattern.replace('/', os.path.sep) in str(name):
 645                return True
 646        return False
 647
 648    for root, dirs, files in os.walk(_directory):
 649        _root = str(root)[len(str(_directory.parent)):]
 650        if _root.startswith(os.path.sep):
 651            _root = _root[len(os.path.sep):]
 652        if _root.startswith('.') and not include_dotfiles:
 653            continue
 654        ### ignore certain patterns
 655        if _found_pattern(_root):
 656            continue
 657
 658        for filename in files:
 659            if filename.startswith('.') and not include_dotfiles:
 660                continue
 661            path = os.path.join(root, filename)
 662            if _found_pattern(path):
 663                continue
 664
 665            _path = str(path)[len(str(_directory.parent)):]
 666            if _path.startswith(os.path.sep):
 667                _path = _path[len(os.path.sep):]
 668            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
 669
 670            paths.append((path, _path))
 671    return paths
 672
 673
 674def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
 675    """
 676    Recursively replace passwords in a dictionary.
 677
 678    Parameters
 679    ----------
 680    d: Dict[str, Any]
 681        The dictionary to search through.
 682
 683    replace_with: str, default '*'
 684        The string to replace each character of the password with.
 685
 686    Returns
 687    -------
 688    Another dictionary where values to the keys `'password'`
 689    are replaced with `replace_with` (`'*'`).
 690
 691    Examples
 692    --------
 693    >>> replace_password({'a': 1})
 694    {'a': 1}
 695    >>> replace_password({'password': '123'})
 696    {'password': '***'}
 697    >>> replace_password({'nested': {'password': '123'}})
 698    {'nested': {'password': '***'}}
 699    >>> replace_password({'password': '123'}, replace_with='!')
 700    {'password': '!!!'}
 701
 702    """
 703    import copy
 704    _d = copy.deepcopy(d)
 705    for k, v in d.items():
 706        if isinstance(v, dict):
 707            _d[k] = replace_password(v)
 708        elif 'password' in str(k).lower():
 709            _d[k] = ''.join([replace_with for char in str(v)])
 710        elif str(k).lower() == 'uri':
 711            from meerschaum.connectors.sql import SQLConnector
 712            try:
 713                uri_params = SQLConnector.parse_uri(v)
 714            except Exception as e:
 715                uri_params = None
 716            if not uri_params:
 717                continue
 718            if not 'username' in uri_params or not 'password' in uri_params:
 719                continue
 720            _d[k] = v.replace(
 721                uri_params['username'] + ':' + uri_params['password'],
 722                uri_params['username'] + ':' + ''.join(
 723                    [replace_with for char in str(uri_params['password'])]
 724                )
 725            )
 726    return _d
 727
 728
 729def filter_keywords(
 730        func: Callable[[Any], Any],
 731        **kw: Any
 732    ) -> Dict[str, Any]:
 733    """
 734    Filter out unsupported keywords.
 735
 736    Parameters
 737    ----------
 738    func: Callable[[Any], Any]
 739        The function to inspect.
 740        
 741    **kw: Any
 742        The arguments to be filtered and passed into `func`.
 743
 744    Returns
 745    -------
 746    A dictionary of keyword arguments accepted by `func`.
 747    
 748    Examples
 749    --------
 750    ```python
 751    >>> def foo(a=1, b=2):
 752    ...     return a * b
 753    >>> filter_keywords(foo, a=2, b=4, c=6)
 754    {'a': 2, 'b': 4}
 755    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
 756    8
 757    ```
 758
 759    """
 760    import inspect
 761    func_params = inspect.signature(func).parameters
 762    ### If the function has a **kw method, skip filtering.
 763    for param, _type in func_params.items():
 764        if '**' in str(_type):
 765            return kw
 766    return {k: v for k, v in kw.items() if k in func_params}
 767
 768
 769def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
 770    """
 771    Convert an ordered dict to a dict.
 772    Does not mutate the original OrderedDict.
 773    """
 774    from collections import OrderedDict
 775    _d = dict(od)
 776    for k, v in od.items():
 777        if isinstance(v, OrderedDict) or (
 778            issubclass(type(v), OrderedDict)
 779        ):
 780            _d[k] = dict_from_od(v)
 781    return _d
 782
 783def remove_ansi(s : str) -> str:
 784    """
 785    Remove ANSI escape characters from a string.
 786
 787    Parameters
 788    ----------
 789    s: str:
 790        The string to be cleaned.
 791
 792    Returns
 793    -------
 794    A string with the ANSI characters removed.
 795
 796    Examples
 797    --------
 798    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
 799    'Hello, World!'
 800
 801    """
 802    import re
 803    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
 804
 805
 806def get_connector_labels(
 807        *types: str,
 808        search_term: str = '',
 809        ignore_exact_match = True,
 810    ) -> List[str]:
 811    """
 812    Read connector labels from the configuration dictionary.
 813
 814    Parameters
 815    ----------
 816    *types: str
 817        The connector types.
 818        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
 819
 820    search_term: str, default ''
 821        A filter on the connectors' labels.
 822
 823    ignore_exact_match: bool, default True
 824        If `True`, skip a connector if the search_term is an exact match.
 825
 826    Returns
 827    -------
 828    A list of the keys of defined connectors.
 829
 830    """
 831    from meerschaum.config import get_config
 832    connectors = get_config('meerschaum', 'connectors')
 833
 834    _types = list(types)
 835    if len(_types) == 0:
 836        _types = list(connectors.keys()) + ['plugin']
 837
 838    conns = []
 839    for t in _types:
 840        if t == 'plugin':
 841            from meerschaum.plugins import get_data_plugins
 842            conns += [
 843                f'{t}:' + plugin.module.__name__.split('.')[-1]
 844                for plugin in get_data_plugins()
 845            ]
 846            continue
 847        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
 848
 849    possibilities = [
 850        c for c in conns
 851            if c.startswith(search_term)
 852                and c != (
 853                    search_term if ignore_exact_match else ''
 854                )
 855    ]
 856    return sorted(possibilities)
 857
 858
 859def json_serialize_datetime(dt: datetime) -> Union[str, None]:
 860    """
 861    Serialize a datetime object into JSON (ISO format string).
 862    
 863    Examples
 864    --------
 865    >>> import json
 866    >>> from datetime import datetime
 867    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
 868    '{"a": "2022-01-01T00:00:00Z"}'
 869
 870    """
 871    if not isinstance(dt, datetime):
 872        return None
 873    tz_suffix = 'Z' if dt.tzinfo is None else ''
 874    return dt.isoformat() + tz_suffix
 875
 876
 877def wget(
 878        url: str,
 879        dest: Optional[Union[str, 'pathlib.Path']] = None,
 880        headers: Optional[Dict[str, Any]] = None,
 881        color: bool = True,
 882        debug: bool = False,
 883        **kw: Any
 884    ) -> 'pathlib.Path':
 885    """
 886    Mimic `wget` with `requests`.
 887
 888    Parameters
 889    ----------
 890    url: str
 891        The URL to the resource to be downloaded.
 892        
 893    dest: Optional[Union[str, pathlib.Path]], default None
 894        The destination path of the downloaded file.
 895        If `None`, save to the current directory.
 896
 897    color: bool, default True
 898        If `debug` is `True`, print color output.
 899
 900    debug: bool, default False
 901        Verbosity toggle.
 902
 903    Returns
 904    -------
 905    The path to the downloaded file.
 906
 907    """
 908    from meerschaum.utils.warnings import warn, error
 909    from meerschaum.utils.debug import dprint
 910    import os, pathlib, re, urllib.request
 911    if headers is None:
 912        headers = {}
 913    request = urllib.request.Request(url, headers=headers)
 914    if not color:
 915        dprint = print
 916    if debug:
 917        dprint(f"Downloading from '{url}'...")
 918    try:
 919        response = urllib.request.urlopen(request)
 920    except Exception as e:
 921        import ssl
 922        ssl._create_default_https_context = ssl._create_unverified_context
 923        try:
 924            response = urllib.request.urlopen(request)
 925        except Exception as _e:
 926            print(_e)
 927            response = None
 928    if response is None or response.code != 200:
 929        error_msg = f"Failed to download from '{url}'."
 930        if color:
 931            error(error_msg)
 932        else:
 933            print(error_msg)
 934            import sys
 935            sys.exit(1)
 936
 937    d = response.headers.get('content-disposition', None)
 938    fname = (
 939        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
 940        else url.split('/')[-1]
 941    )
 942
 943    if dest is None:
 944        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
 945    elif isinstance(dest, str):
 946        dest = pathlib.Path(dest)
 947
 948    with open(dest, 'wb') as f:
 949        f.write(response.fp.read())
 950
 951    if debug:
 952        dprint(f"Downloaded file '{dest}'.")
 953
 954    return dest
 955
 956
 957def async_wrap(func):
 958    """
 959    Run a synchronous function as async.
 960    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
 961    """
 962    import asyncio
 963    from functools import wraps, partial
 964
 965    @wraps(func)
 966    async def run(*args, loop=None, executor=None, **kwargs):
 967        if loop is None:
 968            loop = asyncio.get_event_loop()
 969        pfunc = partial(func, *args, **kwargs)
 970        return await loop.run_in_executor(executor, pfunc)
 971    return run 
 972
 973
 974def debug_trace(browser: bool = True):
 975    """
 976    Open a web-based debugger to trace the execution of the program.
 977    """
 978    from meerschaum.utils.packages import attempt_import
 979    heartrate = attempt_import('heartrate')
 980    heartrate.trace(files=heartrate.files.all, browser=browser)
 981
 982
 983def items_str(
 984        items: List[Any],
 985        quotes: bool = True,
 986        quote_str: str = "'",
 987        commas: bool = True,
 988        comma_str: str = ',',
 989        and_: bool = True,
 990        and_str: str = 'and',
 991        oxford_comma: bool = True,
 992        spaces: bool = True,
 993        space_str = ' ',
 994    ) -> str:
 995    """
 996    Return a formatted string if list items separated by commas.
 997
 998    Parameters
 999    ----------
1000    items: [List[Any]]
1001        The items to be printed as an English list.
1002
1003    quotes: bool, default True
1004        If `True`, wrap items in quotes.
1005
1006    quote_str: str, default "'"
1007        If `quotes` is `True`, prepend and append each item with this string.
1008
1009    and_: bool, default True
1010        If `True`, include the word 'and' before the final item in the list.
1011
1012    and_str: str, default 'and'
1013        If `and_` is True, insert this string where 'and' normally would in and English list.
1014
1015    oxford_comma: bool, default True
1016        If `True`, include the Oxford Comma (comma before the final 'and').
1017        Only applies when `and_` is `True`.
1018
1019    spaces: bool, default True
1020        If `True`, separate items with `space_str`
1021
1022    space_str: str, default ' '
1023        If `spaces` is `True`, separate items with this string.
1024
1025    Returns
1026    -------
1027    A string of the items as an English list.
1028
1029    Examples
1030    --------
1031    >>> items_str([1,2,3])
1032    "'1', '2', and '3'"
1033    >>> items_str([1,2,3], quotes=False)
1034    '1, 2, and 3'
1035    >>> items_str([1,2,3], and_=False)
1036    "'1', '2', '3'"
1037    >>> items_str([1,2,3], spaces=False, and_=False)
1038    "'1','2','3'"
1039    >>> items_str([1,2,3], oxford_comma=False)
1040    "'1', '2' and '3'"
1041    >>> items_str([1,2,3], quote_str=":")
1042    ':1:, :2:, and :3:'
1043    >>> items_str([1,2,3], and_str="or")
1044    "'1', '2', or '3'"
1045    >>> items_str([1,2,3], space_str="_")
1046    "'1',_'2',_and_'3'"
1047
1048    """
1049    if not items:
1050        return ''
1051    
1052    q = quote_str if quotes else ''
1053    s = space_str if spaces else ''
1054    a = and_str if and_ else ''
1055    c = comma_str if commas else ''
1056
1057    if len(items) == 1:
1058        return q + str(list(items)[0]) + q
1059
1060    if len(items) == 2:
1061        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1062
1063    sep = q + c + s + q
1064    output = q + sep.join(str(i) for i in items[:-1]) + q
1065    if oxford_comma:
1066        output += c
1067    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1068    return output
1069
1070
1071def interval_str(delta: Union[timedelta, int]) -> str:
1072    """
1073    Return a human-readable string for a `timedelta` (or `int` minutes).
1074
1075    Parameters
1076    ----------
1077    delta: Union[timedelta, int]
1078        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1079
1080    Returns
1081    -------
1082    A formatted string, fit for human eyes.
1083    """
1084    from meerschaum.utils.packages import attempt_import
1085    humanfriendly = attempt_import('humanfriendly')
1086    delta_seconds = (
1087        delta.total_seconds()
1088        if isinstance(delta, timedelta)
1089        else (delta * 60)
1090    )
1091    return humanfriendly.format_timespan(delta_seconds)
1092
1093
1094def is_docker_available() -> bool:
1095    """Check if we can connect to the Docker engine."""
1096    import subprocess
1097    try:
1098        has_docker = subprocess.call(
1099            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1100        ) == 0
1101    except Exception as e:
1102        has_docker = False
1103    return has_docker
1104
1105
1106def is_bcp_available() -> bool:
1107    """Check if the MSSQL `bcp` utility is installed."""
1108    import subprocess
1109
1110    try:
1111        has_bcp = subprocess.call(
1112            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1113        ) == 0
1114    except Exception as e:
1115        has_bcp = False
1116    return has_bcp
1117
1118
1119def get_last_n_lines(file_name: str, N: int):
1120    """
1121    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1122    """
1123    import os
1124    # Create an empty list to keep the track of last N lines
1125    list_of_lines = []
1126    # Open file for reading in binary mode
1127    with open(file_name, 'rb') as read_obj:
1128        # Move the cursor to the end of the file
1129        read_obj.seek(0, os.SEEK_END)
1130        # Create a buffer to keep the last read line
1131        buffer = bytearray()
1132        # Get the current position of pointer i.e eof
1133        pointer_location = read_obj.tell()
1134        # Loop till pointer reaches the top of the file
1135        while pointer_location >= 0:
1136            # Move the file pointer to the location pointed by pointer_location
1137            read_obj.seek(pointer_location)
1138            # Shift pointer location by -1
1139            pointer_location = pointer_location -1
1140            # read that byte / character
1141            new_byte = read_obj.read(1)
1142            # If the read byte is new line character then it means one line is read
1143            if new_byte == b'\n':
1144                # Save the line in list of lines
1145                list_of_lines.append(buffer.decode()[::-1])
1146                # If the size of list reaches N, then return the reversed list
1147                if len(list_of_lines) == N:
1148                    return list(reversed(list_of_lines))
1149                # Reinitialize the byte array to save next line
1150                buffer = bytearray()
1151            else:
1152                # If last read character is not eol then add it in buffer
1153                buffer.extend(new_byte)
1154        # As file is read completely, if there is still data in buffer, then its first line.
1155        if len(buffer) > 0:
1156            list_of_lines.append(buffer.decode()[::-1])
1157    # return the reversed list
1158    return list(reversed(list_of_lines))
1159
1160
1161def tail(f, n, offset=None):
1162    """
1163    https://stackoverflow.com/a/692616/9699829
1164    
1165    Reads n lines from f with an offset of offset lines.  The return
1166    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1167    an indicator that is `True` if there are more lines in the file.
1168    """
1169    avg_line_length = 74
1170    to_read = n + (offset or 0)
1171
1172    while True:
1173        try:
1174            f.seek(-(avg_line_length * to_read), 2)
1175        except IOError:
1176            # woops.  apparently file is smaller than what we want
1177            # to step back, go to the beginning instead
1178            f.seek(0)
1179        pos = f.tell()
1180        lines = f.read().splitlines()
1181        if len(lines) >= to_read or pos == 0:
1182            return lines[-to_read:offset and -offset or None], \
1183                   len(lines) > to_read or pos > 0
1184        avg_line_length *= 1.3
1185
1186
1187def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1188    """
1189    Remove characters from each section of a string until the length is within the limit.
1190
1191    Parameters
1192    ----------
1193    item: str
1194        The item name to be truncated.
1195
1196    delimeter: str, default '_'
1197        Split `item` by this string into several sections.
1198
1199    max_len: int, default 128
1200        The max acceptable length of the truncated version of `item`.
1201
1202    Returns
1203    -------
1204    The truncated string.
1205
1206    Examples
1207    --------
1208    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1209    'ab_de_gh'
1210
1211    """
1212    if len(item) < max_len:
1213        return item
1214
1215    def _shorten(s: str) -> str:
1216        return s[:-1] if len(s) > 1 else s
1217
1218    sections = list(enumerate(item.split('_')))
1219    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1220    available_chars = max_len - len(sections)
1221
1222    _sections = [(i, s) for i, s in sorted_sections]
1223    _sections_len = sum([len(s) for i, s in _sections])
1224    _old_sections_len = _sections_len
1225    while _sections_len > available_chars:
1226        _sections = [(i, _shorten(s)) for i, s in _sections]
1227        _old_sections_len = _sections_len
1228        _sections_len = sum([len(s) for i, s in _sections])
1229        if _old_sections_len == _sections_len:
1230            raise Exception(f"String could not be truncated: '{item}'")
1231
1232    new_sections = sorted(_sections, key=lambda x: x[0])
1233    return delimeter.join([s for i, s in new_sections])
1234
1235
1236def separate_negation_values(
1237        vals: Union[List[str], Tuple[str]],
1238        negation_prefix: Optional[str] = None,
1239    ) -> Tuple[List[str], List[str]]:
1240    """
1241    Separate the negated values from the positive ones.
1242    Return two lists: positive and negative values.
1243
1244    Parameters
1245    ----------
1246    vals: Union[List[str], Tuple[str]]
1247        A list of strings to parse.
1248
1249    negation_prefix: Optional[str], default None
1250        Include values that start with this string in the second list.
1251        If `None`, use the system default (`_`).
1252    """
1253    if negation_prefix is None:
1254        from meerschaum.config.static import STATIC_CONFIG
1255        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1256    _in_vals, _ex_vals = [], []
1257    for v in vals:
1258        if str(v).startswith(negation_prefix):
1259            _ex_vals.append(str(v)[len(negation_prefix):])
1260        else:
1261            _in_vals.append(v)
1262
1263    return _in_vals, _ex_vals
1264
1265
1266def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1267    """
1268    Translate a params dictionary into lists of include- and exclude-values.
1269
1270    Parameters
1271    ----------
1272    params: Optional[Dict[str, Any]]
1273        A params query dictionary.
1274
1275    Returns
1276    -------
1277    A dictionary mapping keys to a tuple of lists for include and exclude values.
1278
1279    Examples
1280    --------
1281    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1282    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1283    """
1284    if not params:
1285        return {}
1286    return {
1287        col: separate_negation_values(
1288            (
1289                val
1290                if isinstance(val, (list, tuple))
1291                else [val]
1292            )
1293        )
1294        for col, val in params.items()
1295    }
1296
1297
1298def flatten_list(list_: List[Any]) -> List[Any]:
1299    """
1300    Recursively flatten a list.
1301    """
1302    for item in list_:
1303        if isinstance(item, list):
1304            yield from flatten_list(item)
1305        else:
1306            yield item
1307
1308
1309def make_symlink(src_path: pathlib.Path, dest_path: pathlib.Path) -> SuccessTuple:
1310    """
1311    Wrap around `pathlib.Path.symlink_to`, but add support for Windows.
1312
1313    Parameters
1314    ----------
1315    src_path: pathlib.Path
1316        The source path.
1317
1318    dest_path: pathlib.Path
1319        The destination path.
1320
1321    Returns
1322    -------
1323    A SuccessTuple indicating success.
1324    """
1325    if dest_path.exists() and dest_path.resolve() == src_path.resolve():
1326        return True, "Symlink already exists."
1327    try:
1328        dest_path.symlink_to(src_path)
1329        success = True
1330    except Exception as e:
1331        success = False
1332        msg = str(e)
1333    if success:
1334        return success, "Success"
1335    
1336    ### Failed to create a symlink.
1337    ### If we're not on Windows, return an error.
1338    import platform
1339    if platform.system() != 'Windows':
1340        return success, msg
1341
1342    try:
1343        import _winapi
1344    except ImportError:
1345        return False, "Unable to import _winapi."
1346
1347    if src_path.is_dir():
1348        try:
1349            _winapi.CreateJunction(str(src_path), str(dest_path))
1350        except Exception as e:
1351            return False, str(e)
1352        return True, "Success"
1353
1354    ### Last resort: copy the file on Windows.
1355    import shutil
1356    try:
1357        shutil.copy(src_path, dest_path)
1358    except Exception as e:
1359        return False, str(e)
1360    
1361    return True, "Success"
1362
1363
1364def is_symlink(path: pathlib.Path) -> bool:
1365    """
1366    Wrap `path.is_symlink()` but add support for Windows junctions.
1367    """
1368    if path.is_symlink():
1369        return True
1370    import platform, os
1371    if platform.system() != 'Windows':
1372        return False
1373    try:
1374        return bool(os.readlink(path))
1375    except OSError:
1376        return False
1377
1378
1379def parametrized(dec):
1380    """
1381    A meta-decorator for allowing other decorator functions to have parameters.
1382
1383    https://stackoverflow.com/a/26151604/9699829
1384    """
1385    def layer(*args, **kwargs):
1386        def repl(f):
1387            return dec(f, *args, **kwargs)
1388        return repl
1389    return layer
1390
1391
1392def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1393    """
1394    Safely extract a TAR file to a give directory.
1395    This defends against CVE-2007-4559.
1396
1397    Parameters
1398    ----------
1399    tarf: file
1400        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1401
1402    output_dir: Union[str, pathlib.Path]
1403        The output directory.
1404    """
1405    import os
1406
1407    def is_within_directory(directory, target):
1408        abs_directory = os.path.abspath(directory)
1409        abs_target = os.path.abspath(target)
1410        prefix = os.path.commonprefix([abs_directory, abs_target])
1411        return prefix == abs_directory 
1412
1413    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1414        for member in tar.getmembers():
1415            member_path = os.path.join(path, member.name)
1416            if not is_within_directory(path, member_path):
1417                raise Exception("Attempted Path Traversal in Tar File")
1418
1419        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1420
1421    return safe_extract(tarf, output_dir)
1422
1423##################
1424# Legacy imports #
1425##################
1426
1427def choose_subaction(*args, **kwargs) -> Any:
1428    """
1429    Placeholder function to prevent breaking legacy behavior.
1430    See `meerschaum.actions.choose_subaction`.
1431    """
1432    from meerschaum.actions import choose_subaction as _choose_subactions
1433    return _choose_subactions(*args, **kwargs)
1434
1435
1436def print_options(*args, **kwargs) -> None:
1437    """
1438    Placeholder function to prevent breaking legacy behavior.
1439    See `meerschaum.utils.formatting.print_options`.
1440    """
1441    from meerschaum.utils.formatting import print_options as _print_options
1442    return _print_options(*args, **kwargs)
1443
1444
1445def to_pandas_dtype(*args, **kwargs) -> Any:
1446    """
1447    Placeholder function to prevent breaking legacy behavior.
1448    See `meerschaum.utils.dtypes.to_pandas_dtype`.
1449    """
1450    from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype
1451    return _to_pandas_dtype(*args, **kwargs)
1452
1453
1454def filter_unseen_df(*args, **kwargs) -> Any:
1455    """
1456    Placeholder function to prevent breaking legacy behavior.
1457    See `meerschaum.utils.dataframe.filter_unseen_df`.
1458    """
1459    from meerschaum.utils.dataframe import filter_unseen_df as real_function
1460    return real_function(*args, **kwargs)
1461
1462
1463def add_missing_cols_to_df(*args, **kwargs) -> Any:
1464    """
1465    Placeholder function to prevent breaking legacy behavior.
1466    See `meerschaum.utils.dataframe.add_missing_cols_to_df`.
1467    """
1468    from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function
1469    return real_function(*args, **kwargs)
1470
1471
1472def parse_df_datetimes(*args, **kwargs) -> Any:
1473    """
1474    Placeholder function to prevent breaking legacy behavior.
1475    See `meerschaum.utils.dataframe.parse_df_datetimes`.
1476    """
1477    from meerschaum.utils.dataframe import parse_df_datetimes as real_function
1478    return real_function(*args, **kwargs)
1479
1480
1481def df_from_literal(*args, **kwargs) -> Any:
1482    """
1483    Placeholder function to prevent breaking legacy behavior.
1484    See `meerschaum.utils.dataframe.df_from_literal`.
1485    """
1486    from meerschaum.utils.dataframe import df_from_literal as real_function
1487    return real_function(*args, **kwargs)
1488
1489
1490def get_json_cols(*args, **kwargs) -> Any:
1491    """
1492    Placeholder function to prevent breaking legacy behavior.
1493    See `meerschaum.utils.dataframe.get_json_cols`.
1494    """
1495    from meerschaum.utils.dataframe import get_json_cols as real_function
1496    return real_function(*args, **kwargs)
1497
1498
1499def get_unhashable_cols(*args, **kwargs) -> Any:
1500    """
1501    Placeholder function to prevent breaking legacy behavior.
1502    See `meerschaum.utils.dataframe.get_unhashable_cols`.
1503    """
1504    from meerschaum.utils.dataframe import get_unhashable_cols as real_function
1505    return real_function(*args, **kwargs)
1506
1507
1508def enforce_dtypes(*args, **kwargs) -> Any:
1509    """
1510    Placeholder function to prevent breaking legacy behavior.
1511    See `meerschaum.utils.dataframe.enforce_dtypes`.
1512    """
1513    from meerschaum.utils.dataframe import enforce_dtypes as real_function
1514    return real_function(*args, **kwargs)
1515
1516
1517def get_datetime_bound_from_df(*args, **kwargs) -> Any:
1518    """
1519    Placeholder function to prevent breaking legacy behavior.
1520    See `meerschaum.utils.dataframe.get_datetime_bound_from_df`.
1521    """
1522    from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function
1523    return real_function(*args, **kwargs)
1524
1525
1526def df_is_chunk_generator(*args, **kwargs) -> Any:
1527    """
1528    Placeholder function to prevent breaking legacy behavior.
1529    See `meerschaum.utils.dataframe.df_is_chunk_generator`.
1530    """
1531    from meerschaum.utils.dataframe import df_is_chunk_generator as real_function
1532    return real_function(*args, **kwargs)
1533
1534
1535def choices_docstring(*args, **kwargs) -> Any:
1536    """
1537    Placeholder function to prevent breaking legacy behavior.
1538    See `meerschaum.actions.choices_docstring`.
1539    """
1540    from meerschaum.actions import choices_docstring as real_function
1541    return real_function(*args, **kwargs)
1542
1543
1544def _get_subaction_names(*args, **kwargs) -> Any:
1545    """
1546    Placeholder function to prevent breaking legacy behavior.
1547    See `meerschaum.actions._get_subaction_names`.
1548    """
1549    from meerschaum.actions import _get_subaction_names as real_function
1550    return real_function(*args, **kwargs)
def add_method_to_class( func: Callable[[Any], Any], class_def: "'Class'", method_name: Optional[str] = None, keep_self: Optional[bool] = None) -> Callable[[Any], Any]:
45def add_method_to_class(
46        func: Callable[[Any], Any],
47        class_def: 'Class',
48        method_name: Optional[str] = None,
49        keep_self: Optional[bool] = None,
50    ) -> Callable[[Any], Any]:
51    """
52    Add function `func` to class `class_def`.
53
54    Parameters
55    ----------
56    func: Callable[[Any], Any]
57        Function to be added as a method of the class
58
59    class_def: Class
60        Class to be modified.
61
62    method_name: Optional[str], default None
63        New name of the method. None will use func.__name__ (default).
64
65    Returns
66    -------
67    The modified function object.
68
69    """
70    from functools import wraps
71
72    is_class = isinstance(class_def, type)
73    
74    @wraps(func)
75    def wrapper(self, *args, **kw):
76        return func(*args, **kw)
77
78    if method_name is None:
79        method_name = func.__name__
80
81    setattr(class_def, method_name, (
82            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
83        )
84    )
85
86    return func

Add function func to class class_def.

Parameters
  • func (Callable[[Any], Any]): Function to be added as a method of the class
  • class_def (Class): Class to be modified.
  • method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
  • The modified function object.
def generate_password(length: int = 12) -> str:
 89def generate_password(length: int = 12) -> str:
 90    """Generate a secure password of given length.
 91
 92    Parameters
 93    ----------
 94    length : int, default 12
 95        The length of the password.
 96
 97    Returns
 98    -------
 99    A random password string.
100
101    """
102    import secrets, string
103    return ''.join((secrets.choice(string.ascii_letters) for i in range(length)))

Generate a secure password of given length.

Parameters
  • length (int, default 12): The length of the password.
Returns
  • A random password string.
def is_int(s: str) -> bool:
105def is_int(s : str) -> bool:
106    """
107    Check if string is an int.
108
109    Parameters
110    ----------
111    s: str
112        The string to be checked.
113        
114    Returns
115    -------
116    A bool indicating whether the string was able to be cast to an integer.
117
118    """
119    try:
120        float(s)
121    except Exception as e:
122        return False
123    
124    return float(s).is_integer()

Check if string is an int.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string was able to be cast to an integer.
def string_to_dict(params_string: str) -> Dict[str, Any]:
127def string_to_dict(
128        params_string: str
129    ) -> Dict[str, Any]:
130    """
131    Parse a string into a dictionary.
132    
133    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
134
135    Parameters
136    ----------
137    params_string: str
138        The string to be parsed.
139        
140    Returns
141    -------
142    The parsed dictionary.
143
144    Examples
145    --------
146    >>> string_to_dict("a:1,b:2") 
147    {'a': 1, 'b': 2}
148    >>> string_to_dict('{"a": 1, "b": 2}')
149    {'a': 1, 'b': 2}
150
151    """
152    if params_string == "":
153        return {}
154
155    import json
156
157    ### Kind of a weird edge case.
158    ### In the generated compose file, there is some weird escaping happening,
159    ### so the string to be parsed starts and ends with a single quote.
160    if (
161        isinstance(params_string, str)
162        and len(params_string) > 4
163        and params_string[1] == "{"
164        and params_string[-2] == "}"
165    ):
166        return json.loads(params_string[1:-1])
167    if str(params_string).startswith('{'):
168        return json.loads(params_string)
169
170    import ast
171    params_dict = {}
172    for param in params_string.split(","):
173        _keys = param.split(":")
174        keys = _keys[:-1]
175        try:
176            val = ast.literal_eval(_keys[-1])
177        except Exception as e:
178            val = str(_keys[-1])
179
180        c = params_dict
181        for _k in keys[:-1]:
182            try:
183                k = ast.literal_eval(_k)
184            except Exception as e:
185                k = str(_k)
186            if k not in c:
187                c[k] = {}
188            c = c[k]
189
190        c[keys[-1]] = val
191
192    return params_dict

Parse a string into a dictionary.

If the string begins with '{', parse as JSON. Otherwise use simple parsing.

Parameters
  • params_string (str): The string to be parsed.
Returns
  • The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2") 
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
def parse_config_substitution( value: str, leading_key: str = 'MRSM', begin_key: str = '{', end_key: str = '}', delimeter: str = ':') -> List[Any]:
195def parse_config_substitution(
196        value: str,
197        leading_key: str = 'MRSM',
198        begin_key: str = '{',
199        end_key: str = '}',
200        delimeter: str = ':'
201    ) -> List[Any]:
202    """
203    Parse Meerschaum substitution syntax
204    E.g. MRSM{value1:value2} => ['value1', 'value2']
205    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
206    """
207    if not value.beginswith(leading_key):
208        return value
209
210    return leading_key[len(leading_key):][len():-1].split(delimeter)

Parse Meerschaum substitution syntax E.g. MRSM{value1:value2} => ['value1', 'value2'] NOTE: Not currently used. See search_and_substitute_config in meerschaum.config._read_yaml.

def edit_file( path: Union[pathlib.Path, str], default_editor: str = 'pyvim', debug: bool = False) -> bool:
212def edit_file(
213        path: Union[pathlib.Path, str],
214        default_editor: str = 'pyvim',
215        debug: bool = False
216    ) -> bool:
217    """
218    Open a file for editing.
219    
220    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
221
222    Parameters
223    ----------
224    path: Union[pathlib.Path, str]
225        The path to the file to be edited.
226        
227    default_editor: str, default 'pyvim'
228        If `$EDITOR` is not set, use this instead.
229        If `pyvim` is not installed, it will install it from PyPI.
230
231    debug: bool, default False
232        Verbosity toggle.
233
234    Returns
235    -------
236    A bool indicating the file was successfully edited.
237
238    """
239    import os
240    from subprocess import call
241    from meerschaum.utils.debug import dprint
242    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
243    try:
244        EDITOR = os.environ.get('EDITOR', default_editor)
245        if debug:
246            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
247        rc = call([EDITOR, path])
248    except Exception as e: ### can't open with default editors
249        if debug:
250            dprint(e)
251            dprint('Failed to open file with system editor. Falling back to pyvim...')
252        pyvim = attempt_import('pyvim', lazy=False)
253        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
254    return rc == 0

Open a file for editing.

Attempt to launch the user's defined $EDITOR, otherwise use pyvim.

Parameters
  • path (Union[pathlib.Path, str]): The path to the file to be edited.
  • default_editor (str, default 'pyvim'): If $EDITOR is not set, use this instead. If pyvim is not installed, it will install it from PyPI.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating the file was successfully edited.
def is_pipe_registered( pipe: meerschaum.core.Pipe.Pipe, pipes: Dict[str, Dict[str, Dict[str, meerschaum.core.Pipe.Pipe]]], debug: bool = False) -> bool:
257def is_pipe_registered(
258        pipe: 'meerschaum.Pipe',
259        pipes: PipesDict,
260        debug: bool = False
261    ) -> bool:
262    """
263    Check if a Pipe is inside the pipes dictionary.
264
265    Parameters
266    ----------
267    pipe: meerschaum.Pipe
268        The pipe to see if it's in the dictionary.
269        
270    pipes: PipesDict
271        The dictionary to search inside.
272        
273    debug: bool, default False
274        Verbosity toggle.
275
276    Returns
277    -------
278    A bool indicating whether the pipe is inside the dictionary.
279    """
280    from meerschaum.utils.debug import dprint
281    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
282    if debug:
283        dprint(f'{ck}, {mk}, {lk}')
284        dprint(f'{pipe}, {pipes}')
285    return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]

Check if a Pipe is inside the pipes dictionary.

Parameters
  • pipe (meerschaum.Pipe): The pipe to see if it's in the dictionary.
  • pipes (PipesDict): The dictionary to search inside.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating whether the pipe is inside the dictionary.
def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
288def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
289    """
290    Determine the columns and lines in the terminal.
291    If they cannot be determined, return the default values (100 columns and 120 lines).
292
293    Parameters
294    ----------
295    default_cols: int, default 100
296        If the columns cannot be determined, return this value.
297
298    default_lines: int, default 120
299        If the lines cannot be determined, return this value.
300
301    Returns
302    -------
303    A tuple if integers for the columns and lines.
304    """
305    import os
306    try:
307        size = os.get_terminal_size()
308        _cols, _lines = size.columns, size.lines
309    except Exception as e:
310        _cols, _lines = (
311            int(os.environ.get('COLUMNS', str(default_cols))),
312            int(os.environ.get('LINES', str(default_lines))),
313        )
314    return _cols, _lines

Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).

Parameters
  • default_cols (int, default 100): If the columns cannot be determined, return this value.
  • default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
  • A tuple if integers for the columns and lines.
def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
317def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
318    """
319    Iterate over a list in chunks.
320    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
321
322    Parameters
323    ----------
324    iterable: Iterable[Any]
325        The iterable to iterate over in chunks.
326        
327    chunksize: int
328        The size of chunks to iterate with.
329        
330    fillvalue: Optional[Any], default None
331        If the chunks do not evenly divide into the iterable, pad the end with this value.
332
333    Returns
334    -------
335    A generator of tuples of size `chunksize`.
336
337    """
338    from itertools import zip_longest
339    args = [iter(iterable)] * chunksize
340    return zip_longest(*args, fillvalue=fillvalue)

Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks

Parameters
  • iterable (Iterable[Any]): The iterable to iterate over in chunks.
  • chunksize (int): The size of chunks to iterate with.
  • fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
  • A generator of tuples of size chunksize.
def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
342def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
343    """
344    Sort a dictionary's values and return a new dictionary.
345
346    Parameters
347    ----------
348    d: Dict[Any, Any]
349        The dictionary to be sorted.
350
351    Returns
352    -------
353    A sorted dictionary.
354
355    Examples
356    --------
357    >>> sorted_dict({'b': 1, 'a': 2})
358    {'b': 1, 'a': 2}
359    >>> sorted_dict({'b': 2, 'a': 1})
360    {'a': 1, 'b': 2}
361
362    """
363    try:
364        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
365    except Exception as e:
366        return d

Sort a dictionary's values and return a new dictionary.

Parameters
  • d (Dict[Any, Any]): The dictionary to be sorted.
Returns
  • A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
def flatten_pipes_dict( pipes_dict: Dict[str, Dict[str, Dict[str, meerschaum.core.Pipe.Pipe]]]) -> 'List[Pipe]':
368def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
369    """
370    Convert the standard pipes dictionary into a list.
371
372    Parameters
373    ----------
374    pipes_dict: PipesDict
375        The pipes dictionary to be flattened.
376
377    Returns
378    -------
379    A list of `Pipe` objects.
380
381    """
382    pipes_list = []
383    for ck in pipes_dict.values():
384        for mk in ck.values():
385            pipes_list += list(mk.values())
386    return pipes_list

Convert the standard pipes dictionary into a list.

Parameters
  • pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
  • A list of Pipe objects.
def round_time( dt: Optional[datetime.datetime] = None, date_delta: Optional[datetime.timedelta] = None, to: str = 'down') -> datetime.datetime:
389def round_time(
390        dt: Optional[datetime] = None,
391        date_delta: Optional[timedelta] = None,
392        to: 'str' = 'down'
393    ) -> datetime:
394    """
395    Round a datetime object to a multiple of a timedelta.
396    http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
397
398    NOTE: This function strips timezone information!
399
400    Parameters
401    ----------
402    dt: Optional[datetime], default None
403        If `None`, grab the current UTC datetime.
404
405    date_delta: Optional[timedelta], default None
406        If `None`, use a delta of 1 minute.
407
408    to: 'str', default 'down'
409        Available options are `'up'`, `'down'`, and `'closest'`.
410
411    Returns
412    -------
413    A rounded `datetime` object.
414
415    Examples
416    --------
417    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
418    datetime.datetime(2022, 1, 1, 12, 15)
419    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
420    datetime.datetime(2022, 1, 1, 12, 16)
421    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
422    datetime.datetime(2022, 1, 1, 12, 0)
423    >>> round_time(
424    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
425    ...   timedelta(hours=1),
426    ...   to = 'closest'
427    ... )
428    datetime.datetime(2022, 1, 1, 12, 0)
429    >>> round_time(
430    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
431    ...   datetime.timedelta(hours=1),
432    ...   to = 'closest'
433    ... )
434    datetime.datetime(2022, 1, 1, 13, 0)
435
436    """
437    if date_delta is None:
438        date_delta = timedelta(minutes=1)
439    round_to = date_delta.total_seconds()
440    if dt is None:
441        dt = datetime.now(timezone.utc).replace(tzinfo=None)
442    seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds
443
444    if seconds % round_to == 0 and dt.microsecond == 0:
445        rounding = (seconds + round_to / 2) // round_to * round_to
446    else:
447        if to == 'up':
448            rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to
449        elif to == 'down':
450            rounding = seconds // round_to * round_to
451        else:
452            rounding = (seconds + round_to / 2) // round_to * round_to
453
454    return dt + timedelta(0, rounding - seconds, - dt.microsecond)

Round a datetime object to a multiple of a timedelta. http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python

NOTE: This function strips timezone information!

Parameters
  • dt (Optional[datetime], default None): If None, grab the current UTC datetime.
  • date_delta (Optional[timedelta], default None): If None, use a delta of 1 minute.
  • to ('str', default 'down'): Available options are 'up', 'down', and 'closest'.
Returns
  • A rounded datetime object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 15, 57, 200),
...   timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 45, 57, 200),
...   datetime.timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)
def timed_input( seconds: int = 10, timeout_message: str = '', prompt: str = '', icon: bool = False, **kw) -> Optional[str]:
457def timed_input(
458        seconds: int = 10,
459        timeout_message: str = "",
460        prompt: str = "",
461        icon: bool = False,
462        **kw
463    ) -> Union[str, None]:
464    """
465    Accept user input only for a brief period of time.
466
467    Parameters
468    ----------
469    seconds: int, default 10
470        The number of seconds to wait.
471
472    timeout_message: str, default ''
473        The message to print after the window has elapsed.
474
475    prompt: str, default ''
476        The prompt to print during the window.
477
478    icon: bool, default False
479        If `True`, print the configured input icon.
480
481
482    Returns
483    -------
484    The input string entered by the user.
485
486    """
487    import signal, time
488
489    class TimeoutExpired(Exception):
490        """Raise this exception when the timeout is reached."""
491
492    def alarm_handler(signum, frame):
493        raise TimeoutExpired
494
495    # set signal handler
496    signal.signal(signal.SIGALRM, alarm_handler)
497    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
498
499    try:
500        return input(prompt)
501    except TimeoutExpired:
502        return None
503    except (EOFError, RuntimeError):
504        try:
505            print(prompt)
506            time.sleep(seconds)
507        except TimeoutExpired:
508            return None
509    finally:
510        signal.alarm(0) # cancel alarm

Accept user input only for a brief period of time.

Parameters
  • seconds (int, default 10): The number of seconds to wait.
  • timeout_message (str, default ''): The message to print after the window has elapsed.
  • prompt (str, default ''): The prompt to print during the window.
  • icon (bool, default False): If True, print the configured input icon.
Returns
  • The input string entered by the user.
def replace_pipes_in_dict( pipes: Optional[Dict[str, Dict[str, Dict[str, meerschaum.core.Pipe.Pipe]]]] = None, func: "'function'" = <class 'str'>, debug: bool = False, **kw) -> Dict[str, Dict[str, Dict[str, meerschaum.core.Pipe.Pipe]]]:
516def replace_pipes_in_dict(
517        pipes : Optional[PipesDict] = None,
518        func: 'function' = str,
519        debug: bool = False,
520        **kw
521    ) -> PipesDict:
522    """
523    Replace the Pipes in a Pipes dict with the result of another function.
524
525    Parameters
526    ----------
527    pipes: Optional[PipesDict], default None
528        The pipes dict to be processed.
529
530    func: Callable[[Any], Any], default str
531        The function to be applied to every pipe.
532        Defaults to the string constructor.
533
534    debug: bool, default False
535        Verbosity toggle.
536    
537
538    Returns
539    -------
540    A dictionary where every pipe is replaced with the output of a function.
541
542    """
543    import copy
544    def change_dict(d : Dict[Any, Any], func : 'function') -> None:
545        for k, v in d.items():
546            if isinstance(v, dict):
547                change_dict(v, func)
548            else:
549                d[k] = func(v)
550
551    if pipes is None:
552        from meerschaum import get_pipes
553        pipes = get_pipes(debug=debug, **kw)
554
555    result = copy.deepcopy(pipes)
556    change_dict(result, func)
557    return result

Replace the Pipes in a Pipes dict with the result of another function.

Parameters
  • pipes (Optional[PipesDict], default None): The pipes dict to be processed.
  • func (Callable[[Any], Any], default str): The function to be applied to every pipe. Defaults to the string constructor.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A dictionary where every pipe is replaced with the output of a function.
def enforce_gevent_monkey_patch():
559def enforce_gevent_monkey_patch():
560    """
561    Check if gevent monkey patching is enabled, and if not, then apply patching.
562    """
563    from meerschaum.utils.packages import attempt_import
564    import socket
565    gevent, gevent_socket, gevent_monkey = attempt_import(
566        'gevent', 'gevent.socket', 'gevent.monkey'
567    )
568    if not socket.socket is gevent_socket.socket:
569        gevent_monkey.patch_all()

Check if gevent monkey patching is enabled, and if not, then apply patching.

def is_valid_email(email: str) -> Optional[re.Match]:
571def is_valid_email(email: str) -> Union['re.Match', None]:
572    """
573    Check whether a string is a valid email.
574
575    Parameters
576    ----------
577    email: str
578        The string to be examined.
579        
580    Returns
581    -------
582    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
583
584    Examples
585    --------
586    >>> is_valid_email('foo')
587    >>> is_valid_email('foo@foo.com')
588    <re.Match object; span=(0, 11), match='foo@foo.com'>
589
590    """
591    import re
592    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
593    return re.search(regex, email)

Check whether a string is a valid email.

Parameters
  • email (str): The string to be examined.
Returns
  • None if a string is not in email format, otherwise a re.Match object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
def string_width(string: str, widest: bool = True) -> int:
596def string_width(string: str, widest: bool = True) -> int:
597    """
598    Calculate the width of a string, either by its widest or last line.
599
600    Parameters
601    ----------
602    string: str:
603        The string to be examined.
604        
605    widest: bool, default True
606        No longer used because `widest` is always assumed to be true.
607
608    Returns
609    -------
610    An integer for the text's visual width.
611
612    Examples
613    --------
614    >>> string_width('a')
615    1
616    >>> string_width('a\\nbc\\nd')
617    2
618
619    """
620    def _widest():
621        words = string.split('\n')
622        max_length = 0
623        for w in words:
624            length = len(w)
625            if length > max_length:
626                max_length = length
627        return max_length
628
629    return _widest()

Calculate the width of a string, either by its widest or last line.

Parameters
  • string (str:): The string to be examined.
  • widest (bool, default True): No longer used because widest is always assumed to be true.
Returns
  • An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
675def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
676    """
677    Recursively replace passwords in a dictionary.
678
679    Parameters
680    ----------
681    d: Dict[str, Any]
682        The dictionary to search through.
683
684    replace_with: str, default '*'
685        The string to replace each character of the password with.
686
687    Returns
688    -------
689    Another dictionary where values to the keys `'password'`
690    are replaced with `replace_with` (`'*'`).
691
692    Examples
693    --------
694    >>> replace_password({'a': 1})
695    {'a': 1}
696    >>> replace_password({'password': '123'})
697    {'password': '***'}
698    >>> replace_password({'nested': {'password': '123'}})
699    {'nested': {'password': '***'}}
700    >>> replace_password({'password': '123'}, replace_with='!')
701    {'password': '!!!'}
702
703    """
704    import copy
705    _d = copy.deepcopy(d)
706    for k, v in d.items():
707        if isinstance(v, dict):
708            _d[k] = replace_password(v)
709        elif 'password' in str(k).lower():
710            _d[k] = ''.join([replace_with for char in str(v)])
711        elif str(k).lower() == 'uri':
712            from meerschaum.connectors.sql import SQLConnector
713            try:
714                uri_params = SQLConnector.parse_uri(v)
715            except Exception as e:
716                uri_params = None
717            if not uri_params:
718                continue
719            if not 'username' in uri_params or not 'password' in uri_params:
720                continue
721            _d[k] = v.replace(
722                uri_params['username'] + ':' + uri_params['password'],
723                uri_params['username'] + ':' + ''.join(
724                    [replace_with for char in str(uri_params['password'])]
725                )
726            )
727    return _d

Recursively replace passwords in a dictionary.

Parameters
  • d (Dict[str, Any]): The dictionary to search through.
  • replace_with (str, default '*'): The string to replace each character of the password with.
Returns
  • Another dictionary where values to the keys 'password'
  • are replaced with replace_with ('*').
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
def filter_keywords(func: Callable[[Any], Any], **kw: Any) -> Dict[str, Any]:
730def filter_keywords(
731        func: Callable[[Any], Any],
732        **kw: Any
733    ) -> Dict[str, Any]:
734    """
735    Filter out unsupported keywords.
736
737    Parameters
738    ----------
739    func: Callable[[Any], Any]
740        The function to inspect.
741        
742    **kw: Any
743        The arguments to be filtered and passed into `func`.
744
745    Returns
746    -------
747    A dictionary of keyword arguments accepted by `func`.
748    
749    Examples
750    --------
751    ```python
752    >>> def foo(a=1, b=2):
753    ...     return a * b
754    >>> filter_keywords(foo, a=2, b=4, c=6)
755    {'a': 2, 'b': 4}
756    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
757    8
758    ```
759
760    """
761    import inspect
762    func_params = inspect.signature(func).parameters
763    ### If the function has a **kw method, skip filtering.
764    for param, _type in func_params.items():
765        if '**' in str(_type):
766            return kw
767    return {k: v for k, v in kw.items() if k in func_params}

Filter out unsupported keywords.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • **kw (Any): The arguments to be filtered and passed into func.
Returns
  • A dictionary of keyword arguments accepted by func.
Examples
>>> def foo(a=1, b=2):
...     return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
770def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
771    """
772    Convert an ordered dict to a dict.
773    Does not mutate the original OrderedDict.
774    """
775    from collections import OrderedDict
776    _d = dict(od)
777    for k, v in od.items():
778        if isinstance(v, OrderedDict) or (
779            issubclass(type(v), OrderedDict)
780        ):
781            _d[k] = dict_from_od(v)
782    return _d

Convert an ordered dict to a dict. Does not mutate the original OrderedDict.

def remove_ansi(s: str) -> str:
784def remove_ansi(s : str) -> str:
785    """
786    Remove ANSI escape characters from a string.
787
788    Parameters
789    ----------
790    s: str:
791        The string to be cleaned.
792
793    Returns
794    -------
795    A string with the ANSI characters removed.
796
797    Examples
798    --------
799    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
800    'Hello, World!'
801
802    """
803    import re
804    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)

Remove ANSI escape characters from a string.

Parameters
  • s (str:): The string to be cleaned.
Returns
  • A string with the ANSI characters removed.
Examples
>>> remove_ansi("Hello, World!")
'Hello, World!'
def get_connector_labels(*types: str, search_term: str = '', ignore_exact_match=True) -> List[str]:
807def get_connector_labels(
808        *types: str,
809        search_term: str = '',
810        ignore_exact_match = True,
811    ) -> List[str]:
812    """
813    Read connector labels from the configuration dictionary.
814
815    Parameters
816    ----------
817    *types: str
818        The connector types.
819        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
820
821    search_term: str, default ''
822        A filter on the connectors' labels.
823
824    ignore_exact_match: bool, default True
825        If `True`, skip a connector if the search_term is an exact match.
826
827    Returns
828    -------
829    A list of the keys of defined connectors.
830
831    """
832    from meerschaum.config import get_config
833    connectors = get_config('meerschaum', 'connectors')
834
835    _types = list(types)
836    if len(_types) == 0:
837        _types = list(connectors.keys()) + ['plugin']
838
839    conns = []
840    for t in _types:
841        if t == 'plugin':
842            from meerschaum.plugins import get_data_plugins
843            conns += [
844                f'{t}:' + plugin.module.__name__.split('.')[-1]
845                for plugin in get_data_plugins()
846            ]
847            continue
848        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
849
850    possibilities = [
851        c for c in conns
852            if c.startswith(search_term)
853                and c != (
854                    search_term if ignore_exact_match else ''
855                )
856    ]
857    return sorted(possibilities)

Read connector labels from the configuration dictionary.

Parameters
  • *types (str): The connector types. If none are provided, use the defined types ('sql' and 'api') and 'plugin'.
  • search_term (str, default ''): A filter on the connectors' labels.
  • ignore_exact_match (bool, default True): If True, skip a connector if the search_term is an exact match.
Returns
  • A list of the keys of defined connectors.
def json_serialize_datetime(dt: datetime.datetime) -> Optional[str]:
860def json_serialize_datetime(dt: datetime) -> Union[str, None]:
861    """
862    Serialize a datetime object into JSON (ISO format string).
863    
864    Examples
865    --------
866    >>> import json
867    >>> from datetime import datetime
868    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
869    '{"a": "2022-01-01T00:00:00Z"}'
870
871    """
872    if not isinstance(dt, datetime):
873        return None
874    tz_suffix = 'Z' if dt.tzinfo is None else ''
875    return dt.isoformat() + tz_suffix

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
def wget( url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, color: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
878def wget(
879        url: str,
880        dest: Optional[Union[str, 'pathlib.Path']] = None,
881        headers: Optional[Dict[str, Any]] = None,
882        color: bool = True,
883        debug: bool = False,
884        **kw: Any
885    ) -> 'pathlib.Path':
886    """
887    Mimic `wget` with `requests`.
888
889    Parameters
890    ----------
891    url: str
892        The URL to the resource to be downloaded.
893        
894    dest: Optional[Union[str, pathlib.Path]], default None
895        The destination path of the downloaded file.
896        If `None`, save to the current directory.
897
898    color: bool, default True
899        If `debug` is `True`, print color output.
900
901    debug: bool, default False
902        Verbosity toggle.
903
904    Returns
905    -------
906    The path to the downloaded file.
907
908    """
909    from meerschaum.utils.warnings import warn, error
910    from meerschaum.utils.debug import dprint
911    import os, pathlib, re, urllib.request
912    if headers is None:
913        headers = {}
914    request = urllib.request.Request(url, headers=headers)
915    if not color:
916        dprint = print
917    if debug:
918        dprint(f"Downloading from '{url}'...")
919    try:
920        response = urllib.request.urlopen(request)
921    except Exception as e:
922        import ssl
923        ssl._create_default_https_context = ssl._create_unverified_context
924        try:
925            response = urllib.request.urlopen(request)
926        except Exception as _e:
927            print(_e)
928            response = None
929    if response is None or response.code != 200:
930        error_msg = f"Failed to download from '{url}'."
931        if color:
932            error(error_msg)
933        else:
934            print(error_msg)
935            import sys
936            sys.exit(1)
937
938    d = response.headers.get('content-disposition', None)
939    fname = (
940        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
941        else url.split('/')[-1]
942    )
943
944    if dest is None:
945        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
946    elif isinstance(dest, str):
947        dest = pathlib.Path(dest)
948
949    with open(dest, 'wb') as f:
950        f.write(response.fp.read())
951
952    if debug:
953        dprint(f"Downloaded file '{dest}'.")
954
955    return dest

Mimic wget with requests.

Parameters
  • url (str): The URL to the resource to be downloaded.
  • dest (Optional[Union[str, pathlib.Path]], default None): The destination path of the downloaded file. If None, save to the current directory.
  • color (bool, default True): If debug is True, print color output.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The path to the downloaded file.
def async_wrap(func):
958def async_wrap(func):
959    """
960    Run a synchronous function as async.
961    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
962    """
963    import asyncio
964    from functools import wraps, partial
965
966    @wraps(func)
967    async def run(*args, loop=None, executor=None, **kwargs):
968        if loop is None:
969            loop = asyncio.get_event_loop()
970        pfunc = partial(func, *args, **kwargs)
971        return await loop.run_in_executor(executor, pfunc)
972    return run 
def debug_trace(browser: bool = True):
975def debug_trace(browser: bool = True):
976    """
977    Open a web-based debugger to trace the execution of the program.
978    """
979    from meerschaum.utils.packages import attempt_import
980    heartrate = attempt_import('heartrate')
981    heartrate.trace(files=heartrate.files.all, browser=browser)

Open a web-based debugger to trace the execution of the program.

def items_str( items: List[Any], quotes: bool = True, quote_str: str = "'", commas: bool = True, comma_str: str = ',', and_: bool = True, and_str: str = 'and', oxford_comma: bool = True, spaces: bool = True, space_str=' ') -> str:
 984def items_str(
 985        items: List[Any],
 986        quotes: bool = True,
 987        quote_str: str = "'",
 988        commas: bool = True,
 989        comma_str: str = ',',
 990        and_: bool = True,
 991        and_str: str = 'and',
 992        oxford_comma: bool = True,
 993        spaces: bool = True,
 994        space_str = ' ',
 995    ) -> str:
 996    """
 997    Return a formatted string if list items separated by commas.
 998
 999    Parameters
1000    ----------
1001    items: [List[Any]]
1002        The items to be printed as an English list.
1003
1004    quotes: bool, default True
1005        If `True`, wrap items in quotes.
1006
1007    quote_str: str, default "'"
1008        If `quotes` is `True`, prepend and append each item with this string.
1009
1010    and_: bool, default True
1011        If `True`, include the word 'and' before the final item in the list.
1012
1013    and_str: str, default 'and'
1014        If `and_` is True, insert this string where 'and' normally would in and English list.
1015
1016    oxford_comma: bool, default True
1017        If `True`, include the Oxford Comma (comma before the final 'and').
1018        Only applies when `and_` is `True`.
1019
1020    spaces: bool, default True
1021        If `True`, separate items with `space_str`
1022
1023    space_str: str, default ' '
1024        If `spaces` is `True`, separate items with this string.
1025
1026    Returns
1027    -------
1028    A string of the items as an English list.
1029
1030    Examples
1031    --------
1032    >>> items_str([1,2,3])
1033    "'1', '2', and '3'"
1034    >>> items_str([1,2,3], quotes=False)
1035    '1, 2, and 3'
1036    >>> items_str([1,2,3], and_=False)
1037    "'1', '2', '3'"
1038    >>> items_str([1,2,3], spaces=False, and_=False)
1039    "'1','2','3'"
1040    >>> items_str([1,2,3], oxford_comma=False)
1041    "'1', '2' and '3'"
1042    >>> items_str([1,2,3], quote_str=":")
1043    ':1:, :2:, and :3:'
1044    >>> items_str([1,2,3], and_str="or")
1045    "'1', '2', or '3'"
1046    >>> items_str([1,2,3], space_str="_")
1047    "'1',_'2',_and_'3'"
1048
1049    """
1050    if not items:
1051        return ''
1052    
1053    q = quote_str if quotes else ''
1054    s = space_str if spaces else ''
1055    a = and_str if and_ else ''
1056    c = comma_str if commas else ''
1057
1058    if len(items) == 1:
1059        return q + str(list(items)[0]) + q
1060
1061    if len(items) == 2:
1062        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1063
1064    sep = q + c + s + q
1065    output = q + sep.join(str(i) for i in items[:-1]) + q
1066    if oxford_comma:
1067        output += c
1068    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1069    return output

Return a formatted string if list items separated by commas.

Parameters
  • items ([List[Any]]): The items to be printed as an English list.
  • quotes (bool, default True): If True, wrap items in quotes.
  • quote_str (str, default "'"): If quotes is True, prepend and append each item with this string.
  • and_ (bool, default True): If True, include the word 'and' before the final item in the list.
  • and_str (str, default 'and'): If and_ is True, insert this string where 'and' normally would in and English list.
  • oxford_comma (bool, default True): If True, include the Oxford Comma (comma before the final 'and'). Only applies when and_ is True.
  • spaces (bool, default True): If True, separate items with space_str
  • space_str (str, default ' '): If spaces is True, separate items with this string.
Returns
  • A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
def interval_str(delta: Union[datetime.timedelta, int]) -> str:
1072def interval_str(delta: Union[timedelta, int]) -> str:
1073    """
1074    Return a human-readable string for a `timedelta` (or `int` minutes).
1075
1076    Parameters
1077    ----------
1078    delta: Union[timedelta, int]
1079        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1080
1081    Returns
1082    -------
1083    A formatted string, fit for human eyes.
1084    """
1085    from meerschaum.utils.packages import attempt_import
1086    humanfriendly = attempt_import('humanfriendly')
1087    delta_seconds = (
1088        delta.total_seconds()
1089        if isinstance(delta, timedelta)
1090        else (delta * 60)
1091    )
1092    return humanfriendly.format_timespan(delta_seconds)

Return a human-readable string for a timedelta (or int minutes).

Parameters
  • delta (Union[timedelta, int]): The interval to print. If delta is an integer, assume it corresponds to minutes.
Returns
  • A formatted string, fit for human eyes.
def is_docker_available() -> bool:
1095def is_docker_available() -> bool:
1096    """Check if we can connect to the Docker engine."""
1097    import subprocess
1098    try:
1099        has_docker = subprocess.call(
1100            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1101        ) == 0
1102    except Exception as e:
1103        has_docker = False
1104    return has_docker

Check if we can connect to the Docker engine.

def is_bcp_available() -> bool:
1107def is_bcp_available() -> bool:
1108    """Check if the MSSQL `bcp` utility is installed."""
1109    import subprocess
1110
1111    try:
1112        has_bcp = subprocess.call(
1113            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1114        ) == 0
1115    except Exception as e:
1116        has_bcp = False
1117    return has_bcp

Check if the MSSQL bcp utility is installed.

def get_last_n_lines(file_name: str, N: int):
1120def get_last_n_lines(file_name: str, N: int):
1121    """
1122    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1123    """
1124    import os
1125    # Create an empty list to keep the track of last N lines
1126    list_of_lines = []
1127    # Open file for reading in binary mode
1128    with open(file_name, 'rb') as read_obj:
1129        # Move the cursor to the end of the file
1130        read_obj.seek(0, os.SEEK_END)
1131        # Create a buffer to keep the last read line
1132        buffer = bytearray()
1133        # Get the current position of pointer i.e eof
1134        pointer_location = read_obj.tell()
1135        # Loop till pointer reaches the top of the file
1136        while pointer_location >= 0:
1137            # Move the file pointer to the location pointed by pointer_location
1138            read_obj.seek(pointer_location)
1139            # Shift pointer location by -1
1140            pointer_location = pointer_location -1
1141            # read that byte / character
1142            new_byte = read_obj.read(1)
1143            # If the read byte is new line character then it means one line is read
1144            if new_byte == b'\n':
1145                # Save the line in list of lines
1146                list_of_lines.append(buffer.decode()[::-1])
1147                # If the size of list reaches N, then return the reversed list
1148                if len(list_of_lines) == N:
1149                    return list(reversed(list_of_lines))
1150                # Reinitialize the byte array to save next line
1151                buffer = bytearray()
1152            else:
1153                # If last read character is not eol then add it in buffer
1154                buffer.extend(new_byte)
1155        # As file is read completely, if there is still data in buffer, then its first line.
1156        if len(buffer) > 0:
1157            list_of_lines.append(buffer.decode()[::-1])
1158    # return the reversed list
1159    return list(reversed(list_of_lines))
def tail(f, n, offset=None):
1162def tail(f, n, offset=None):
1163    """
1164    https://stackoverflow.com/a/692616/9699829
1165    
1166    Reads n lines from f with an offset of offset lines.  The return
1167    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1168    an indicator that is `True` if there are more lines in the file.
1169    """
1170    avg_line_length = 74
1171    to_read = n + (offset or 0)
1172
1173    while True:
1174        try:
1175            f.seek(-(avg_line_length * to_read), 2)
1176        except IOError:
1177            # woops.  apparently file is smaller than what we want
1178            # to step back, go to the beginning instead
1179            f.seek(0)
1180        pos = f.tell()
1181        lines = f.read().splitlines()
1182        if len(lines) >= to_read or pos == 0:
1183            return lines[-to_read:offset and -offset or None], \
1184                   len(lines) > to_read or pos > 0
1185        avg_line_length *= 1.3

https://stackoverflow.com/a/692616/9699829

Reads n lines from f with an offset of offset lines. The return value is a tuple in the form (lines, has_more) where has_more is an indicator that is True if there are more lines in the file.

def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1188def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1189    """
1190    Remove characters from each section of a string until the length is within the limit.
1191
1192    Parameters
1193    ----------
1194    item: str
1195        The item name to be truncated.
1196
1197    delimeter: str, default '_'
1198        Split `item` by this string into several sections.
1199
1200    max_len: int, default 128
1201        The max acceptable length of the truncated version of `item`.
1202
1203    Returns
1204    -------
1205    The truncated string.
1206
1207    Examples
1208    --------
1209    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1210    'ab_de_gh'
1211
1212    """
1213    if len(item) < max_len:
1214        return item
1215
1216    def _shorten(s: str) -> str:
1217        return s[:-1] if len(s) > 1 else s
1218
1219    sections = list(enumerate(item.split('_')))
1220    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1221    available_chars = max_len - len(sections)
1222
1223    _sections = [(i, s) for i, s in sorted_sections]
1224    _sections_len = sum([len(s) for i, s in _sections])
1225    _old_sections_len = _sections_len
1226    while _sections_len > available_chars:
1227        _sections = [(i, _shorten(s)) for i, s in _sections]
1228        _old_sections_len = _sections_len
1229        _sections_len = sum([len(s) for i, s in _sections])
1230        if _old_sections_len == _sections_len:
1231            raise Exception(f"String could not be truncated: '{item}'")
1232
1233    new_sections = sorted(_sections, key=lambda x: x[0])
1234    return delimeter.join([s for i, s in new_sections])

Remove characters from each section of a string until the length is within the limit.

Parameters
  • item (str): The item name to be truncated.
  • delimeter (str, default '_'): Split item by this string into several sections.
  • max_len (int, default 128): The max acceptable length of the truncated version of item.
Returns
  • The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
def separate_negation_values( vals: Union[List[str], Tuple[str]], negation_prefix: Optional[str] = None) -> Tuple[List[str], List[str]]:
1237def separate_negation_values(
1238        vals: Union[List[str], Tuple[str]],
1239        negation_prefix: Optional[str] = None,
1240    ) -> Tuple[List[str], List[str]]:
1241    """
1242    Separate the negated values from the positive ones.
1243    Return two lists: positive and negative values.
1244
1245    Parameters
1246    ----------
1247    vals: Union[List[str], Tuple[str]]
1248        A list of strings to parse.
1249
1250    negation_prefix: Optional[str], default None
1251        Include values that start with this string in the second list.
1252        If `None`, use the system default (`_`).
1253    """
1254    if negation_prefix is None:
1255        from meerschaum.config.static import STATIC_CONFIG
1256        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1257    _in_vals, _ex_vals = [], []
1258    for v in vals:
1259        if str(v).startswith(negation_prefix):
1260            _ex_vals.append(str(v)[len(negation_prefix):])
1261        else:
1262            _in_vals.append(v)
1263
1264    return _in_vals, _ex_vals

Separate the negated values from the positive ones. Return two lists: positive and negative values.

Parameters
  • vals (Union[List[str], Tuple[str]]): A list of strings to parse.
  • negation_prefix (Optional[str], default None): Include values that start with this string in the second list. If None, use the system default (_).
def get_in_ex_params( params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1267def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1268    """
1269    Translate a params dictionary into lists of include- and exclude-values.
1270
1271    Parameters
1272    ----------
1273    params: Optional[Dict[str, Any]]
1274        A params query dictionary.
1275
1276    Returns
1277    -------
1278    A dictionary mapping keys to a tuple of lists for include and exclude values.
1279
1280    Examples
1281    --------
1282    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1283    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1284    """
1285    if not params:
1286        return {}
1287    return {
1288        col: separate_negation_values(
1289            (
1290                val
1291                if isinstance(val, (list, tuple))
1292                else [val]
1293            )
1294        )
1295        for col, val in params.items()
1296    }

Translate a params dictionary into lists of include- and exclude-values.

Parameters
  • params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
  • A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
def flatten_list(list_: List[Any]) -> List[Any]:
1299def flatten_list(list_: List[Any]) -> List[Any]:
1300    """
1301    Recursively flatten a list.
1302    """
1303    for item in list_:
1304        if isinstance(item, list):
1305            yield from flatten_list(item)
1306        else:
1307            yield item

Recursively flatten a list.

def parametrized(dec):
1380def parametrized(dec):
1381    """
1382    A meta-decorator for allowing other decorator functions to have parameters.
1383
1384    https://stackoverflow.com/a/26151604/9699829
1385    """
1386    def layer(*args, **kwargs):
1387        def repl(f):
1388            return dec(f, *args, **kwargs)
1389        return repl
1390    return layer

A meta-decorator for allowing other decorator functions to have parameters.

https://stackoverflow.com/a/26151604/9699829

def safely_extract_tar(tarf: "'file'", output_dir: Union[str, pathlib.Path]) -> None:
1393def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1394    """
1395    Safely extract a TAR file to a give directory.
1396    This defends against CVE-2007-4559.
1397
1398    Parameters
1399    ----------
1400    tarf: file
1401        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1402
1403    output_dir: Union[str, pathlib.Path]
1404        The output directory.
1405    """
1406    import os
1407
1408    def is_within_directory(directory, target):
1409        abs_directory = os.path.abspath(directory)
1410        abs_target = os.path.abspath(target)
1411        prefix = os.path.commonprefix([abs_directory, abs_target])
1412        return prefix == abs_directory 
1413
1414    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1415        for member in tar.getmembers():
1416            member_path = os.path.join(path, member.name)
1417            if not is_within_directory(path, member_path):
1418                raise Exception("Attempted Path Traversal in Tar File")
1419
1420        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1421
1422    return safe_extract(tarf, output_dir)

Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.

Parameters
  • tarf (file): The TAR file opened with tarfile.open(path, 'r:gz').
  • output_dir (Union[str, pathlib.Path]): The output directory.
def choose_subaction(*args, **kwargs) -> Any:
1428def choose_subaction(*args, **kwargs) -> Any:
1429    """
1430    Placeholder function to prevent breaking legacy behavior.
1431    See `meerschaum.actions.choose_subaction`.
1432    """
1433    from meerschaum.actions import choose_subaction as _choose_subactions
1434    return _choose_subactions(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.actions.choose_subaction.

def to_pandas_dtype(*args, **kwargs) -> Any:
1446def to_pandas_dtype(*args, **kwargs) -> Any:
1447    """
1448    Placeholder function to prevent breaking legacy behavior.
1449    See `meerschaum.utils.dtypes.to_pandas_dtype`.
1450    """
1451    from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype
1452    return _to_pandas_dtype(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dtypes.to_pandas_dtype.

def filter_unseen_df(*args, **kwargs) -> Any:
1455def filter_unseen_df(*args, **kwargs) -> Any:
1456    """
1457    Placeholder function to prevent breaking legacy behavior.
1458    See `meerschaum.utils.dataframe.filter_unseen_df`.
1459    """
1460    from meerschaum.utils.dataframe import filter_unseen_df as real_function
1461    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dataframe.filter_unseen_df.

def add_missing_cols_to_df(*args, **kwargs) -> Any:
1464def add_missing_cols_to_df(*args, **kwargs) -> Any:
1465    """
1466    Placeholder function to prevent breaking legacy behavior.
1467    See `meerschaum.utils.dataframe.add_missing_cols_to_df`.
1468    """
1469    from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function
1470    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dataframe.add_missing_cols_to_df.

def parse_df_datetimes(*args, **kwargs) -> Any:
1473def parse_df_datetimes(*args, **kwargs) -> Any:
1474    """
1475    Placeholder function to prevent breaking legacy behavior.
1476    See `meerschaum.utils.dataframe.parse_df_datetimes`.
1477    """
1478    from meerschaum.utils.dataframe import parse_df_datetimes as real_function
1479    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dataframe.parse_df_datetimes.

def df_from_literal(*args, **kwargs) -> Any:
1482def df_from_literal(*args, **kwargs) -> Any:
1483    """
1484    Placeholder function to prevent breaking legacy behavior.
1485    See `meerschaum.utils.dataframe.df_from_literal`.
1486    """
1487    from meerschaum.utils.dataframe import df_from_literal as real_function
1488    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dataframe.df_from_literal.

def get_json_cols(*args, **kwargs) -> Any:
1491def get_json_cols(*args, **kwargs) -> Any:
1492    """
1493    Placeholder function to prevent breaking legacy behavior.
1494    See `meerschaum.utils.dataframe.get_json_cols`.
1495    """
1496    from meerschaum.utils.dataframe import get_json_cols as real_function
1497    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dataframe.get_json_cols.

def get_unhashable_cols(*args, **kwargs) -> Any:
1500def get_unhashable_cols(*args, **kwargs) -> Any:
1501    """
1502    Placeholder function to prevent breaking legacy behavior.
1503    See `meerschaum.utils.dataframe.get_unhashable_cols`.
1504    """
1505    from meerschaum.utils.dataframe import get_unhashable_cols as real_function
1506    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dataframe.get_unhashable_cols.

def enforce_dtypes(*args, **kwargs) -> Any:
1509def enforce_dtypes(*args, **kwargs) -> Any:
1510    """
1511    Placeholder function to prevent breaking legacy behavior.
1512    See `meerschaum.utils.dataframe.enforce_dtypes`.
1513    """
1514    from meerschaum.utils.dataframe import enforce_dtypes as real_function
1515    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dataframe.enforce_dtypes.

def get_datetime_bound_from_df(*args, **kwargs) -> Any:
1518def get_datetime_bound_from_df(*args, **kwargs) -> Any:
1519    """
1520    Placeholder function to prevent breaking legacy behavior.
1521    See `meerschaum.utils.dataframe.get_datetime_bound_from_df`.
1522    """
1523    from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function
1524    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dataframe.get_datetime_bound_from_df.

def df_is_chunk_generator(*args, **kwargs) -> Any:
1527def df_is_chunk_generator(*args, **kwargs) -> Any:
1528    """
1529    Placeholder function to prevent breaking legacy behavior.
1530    See `meerschaum.utils.dataframe.df_is_chunk_generator`.
1531    """
1532    from meerschaum.utils.dataframe import df_is_chunk_generator as real_function
1533    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.utils.dataframe.df_is_chunk_generator.

def choices_docstring(*args, **kwargs) -> Any:
1536def choices_docstring(*args, **kwargs) -> Any:
1537    """
1538    Placeholder function to prevent breaking legacy behavior.
1539    See `meerschaum.actions.choices_docstring`.
1540    """
1541    from meerschaum.actions import choices_docstring as real_function
1542    return real_function(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See meerschaum.actions.choices_docstring.