meerschaum.utils.misc

Miscellaneous functions go here

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4"""
   5Miscellaneous functions go here
   6"""
   7
   8from __future__ import annotations
   9import sys
  10from datetime import timedelta, datetime, timezone
  11from meerschaum.utils.typing import (
  12    Union,
  13    Any,
  14    Callable,
  15    Optional,
  16    List,
  17    Dict,
  18    SuccessTuple,
  19    Iterable,
  20    PipesDict,
  21    Tuple,
  22    InstanceConnector,
  23    Hashable,
  24    Generator,
  25    Iterator,
  26    TYPE_CHECKING,
  27)
  28import meerschaum as mrsm
  29if TYPE_CHECKING:
  30    import collections
  31
  32__pdoc__: Dict[str, bool] = {
  33    'to_pandas_dtype': False,
  34    'filter_unseen_df': False,
  35    'add_missing_cols_to_df': False,
  36    'parse_df_datetimes': False,
  37    'df_from_literal': False,
  38    'get_json_cols': False,
  39    'get_unhashable_cols': False,
  40    'enforce_dtypes': False,
  41    'get_datetime_bound_from_df': False,
  42    'df_is_chunk_generator': False,
  43    'choices_docstring': False,
  44    '_get_subaction_names': False,
  45}
  46
  47
  48def add_method_to_class(
  49        func: Callable[[Any], Any],
  50        class_def: 'Class',
  51        method_name: Optional[str] = None,
  52        keep_self: Optional[bool] = None,
  53    ) -> Callable[[Any], Any]:
  54    """
  55    Add function `func` to class `class_def`.
  56
  57    Parameters
  58    ----------
  59    func: Callable[[Any], Any]
  60        Function to be added as a method of the class
  61
  62    class_def: Class
  63        Class to be modified.
  64
  65    method_name: Optional[str], default None
  66        New name of the method. None will use func.__name__ (default).
  67
  68    Returns
  69    -------
  70    The modified function object.
  71
  72    """
  73    from functools import wraps
  74
  75    is_class = isinstance(class_def, type)
  76    
  77    @wraps(func)
  78    def wrapper(self, *args, **kw):
  79        return func(*args, **kw)
  80
  81    if method_name is None:
  82        method_name = func.__name__
  83
  84    setattr(class_def, method_name, (
  85            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
  86        )
  87    )
  88
  89    return func
  90
  91
  92def generate_password(length: int = 12) -> str:
  93    """Generate a secure password of given length.
  94
  95    Parameters
  96    ----------
  97    length : int, default 12
  98        The length of the password.
  99
 100    Returns
 101    -------
 102    A random password string.
 103
 104    """
 105    import secrets, string
 106    return ''.join((secrets.choice(string.ascii_letters) for i in range(length)))
 107
 108def is_int(s : str) -> bool:
 109    """
 110    Check if string is an int.
 111
 112    Parameters
 113    ----------
 114    s: str
 115        The string to be checked.
 116        
 117    Returns
 118    -------
 119    A bool indicating whether the string was able to be cast to an integer.
 120
 121    """
 122    try:
 123        float(s)
 124    except Exception as e:
 125        return False
 126    
 127    return float(s).is_integer()
 128
 129
 130def string_to_dict(
 131        params_string: str
 132    ) -> Dict[str, Any]:
 133    """
 134    Parse a string into a dictionary.
 135    
 136    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
 137
 138    Parameters
 139    ----------
 140    params_string: str
 141        The string to be parsed.
 142        
 143    Returns
 144    -------
 145    The parsed dictionary.
 146
 147    Examples
 148    --------
 149    >>> string_to_dict("a:1,b:2") 
 150    {'a': 1, 'b': 2}
 151    >>> string_to_dict('{"a": 1, "b": 2}')
 152    {'a': 1, 'b': 2}
 153
 154    """
 155    if params_string == "":
 156        return {}
 157
 158    import json
 159
 160    ### Kind of a weird edge case.
 161    ### In the generated compose file, there is some weird escaping happening,
 162    ### so the string to be parsed starts and ends with a single quote.
 163    if (
 164        isinstance(params_string, str)
 165        and len(params_string) > 4
 166        and params_string[1] == "{"
 167        and params_string[-2] == "}"
 168    ):
 169        return json.loads(params_string[1:-1])
 170    if str(params_string).startswith('{'):
 171        return json.loads(params_string)
 172
 173    import ast
 174    params_dict = {}
 175    for param in params_string.split(","):
 176        _keys = param.split(":")
 177        keys = _keys[:-1]
 178        try:
 179            val = ast.literal_eval(_keys[-1])
 180        except Exception as e:
 181            val = str(_keys[-1])
 182
 183        c = params_dict
 184        for _k in keys[:-1]:
 185            try:
 186                k = ast.literal_eval(_k)
 187            except Exception as e:
 188                k = str(_k)
 189            if k not in c:
 190                c[k] = {}
 191            c = c[k]
 192
 193        c[keys[-1]] = val
 194
 195    return params_dict
 196
 197
 198def parse_config_substitution(
 199        value: str,
 200        leading_key: str = 'MRSM',
 201        begin_key: str = '{',
 202        end_key: str = '}',
 203        delimeter: str = ':'
 204    ) -> List[Any]:
 205    """
 206    Parse Meerschaum substitution syntax
 207    E.g. MRSM{value1:value2} => ['value1', 'value2']
 208    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
 209    """
 210    if not value.beginswith(leading_key):
 211        return value
 212
 213    return leading_key[len(leading_key):][len():-1].split(delimeter)
 214
 215
 216def edit_file(
 217        path: Union[pathlib.Path, str],
 218        default_editor: str = 'pyvim',
 219        debug: bool = False
 220    ) -> bool:
 221    """
 222    Open a file for editing.
 223    
 224    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
 225
 226    Parameters
 227    ----------
 228    path: Union[pathlib.Path, str]
 229        The path to the file to be edited.
 230        
 231    default_editor: str, default 'pyvim'
 232        If `$EDITOR` is not set, use this instead.
 233        If `pyvim` is not installed, it will install it from PyPI.
 234
 235    debug: bool, default False
 236        Verbosity toggle.
 237
 238    Returns
 239    -------
 240    A bool indicating the file was successfully edited.
 241    """
 242    import os
 243    from subprocess import call
 244    from meerschaum.utils.debug import dprint
 245    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
 246    try:
 247        EDITOR = os.environ.get('EDITOR', default_editor)
 248        if debug:
 249            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
 250        rc = call([EDITOR, path])
 251    except Exception as e: ### can't open with default editors
 252        if debug:
 253            dprint(e)
 254            dprint('Failed to open file with system editor. Falling back to pyvim...')
 255        pyvim = attempt_import('pyvim', lazy=False)
 256        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
 257    return rc == 0
 258
 259
 260def is_pipe_registered(
 261        pipe: mrsm.Pipe,
 262        pipes: PipesDict,
 263        debug: bool = False
 264    ) -> bool:
 265    """
 266    Check if a Pipe is inside the pipes dictionary.
 267
 268    Parameters
 269    ----------
 270    pipe: meerschaum.Pipe
 271        The pipe to see if it's in the dictionary.
 272        
 273    pipes: PipesDict
 274        The dictionary to search inside.
 275        
 276    debug: bool, default False
 277        Verbosity toggle.
 278
 279    Returns
 280    -------
 281    A bool indicating whether the pipe is inside the dictionary.
 282    """
 283    from meerschaum.utils.debug import dprint
 284    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
 285    if debug:
 286        dprint(f'{ck}, {mk}, {lk}')
 287        dprint(f'{pipe}, {pipes}')
 288    return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]
 289
 290
 291def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
 292    """
 293    Determine the columns and lines in the terminal.
 294    If they cannot be determined, return the default values (100 columns and 120 lines).
 295
 296    Parameters
 297    ----------
 298    default_cols: int, default 100
 299        If the columns cannot be determined, return this value.
 300
 301    default_lines: int, default 120
 302        If the lines cannot be determined, return this value.
 303
 304    Returns
 305    -------
 306    A tuple if integers for the columns and lines.
 307    """
 308    import os
 309    try:
 310        size = os.get_terminal_size()
 311        _cols, _lines = size.columns, size.lines
 312    except Exception as e:
 313        _cols, _lines = (
 314            int(os.environ.get('COLUMNS', str(default_cols))),
 315            int(os.environ.get('LINES', str(default_lines))),
 316        )
 317    return _cols, _lines
 318
 319
 320def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
 321    """
 322    Iterate over a list in chunks.
 323    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
 324
 325    Parameters
 326    ----------
 327    iterable: Iterable[Any]
 328        The iterable to iterate over in chunks.
 329        
 330    chunksize: int
 331        The size of chunks to iterate with.
 332        
 333    fillvalue: Optional[Any], default None
 334        If the chunks do not evenly divide into the iterable, pad the end with this value.
 335
 336    Returns
 337    -------
 338    A generator of tuples of size `chunksize`.
 339
 340    """
 341    from itertools import zip_longest
 342    args = [iter(iterable)] * chunksize
 343    return zip_longest(*args, fillvalue=fillvalue)
 344
 345def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
 346    """
 347    Sort a dictionary's values and return a new dictionary.
 348
 349    Parameters
 350    ----------
 351    d: Dict[Any, Any]
 352        The dictionary to be sorted.
 353
 354    Returns
 355    -------
 356    A sorted dictionary.
 357
 358    Examples
 359    --------
 360    >>> sorted_dict({'b': 1, 'a': 2})
 361    {'b': 1, 'a': 2}
 362    >>> sorted_dict({'b': 2, 'a': 1})
 363    {'a': 1, 'b': 2}
 364
 365    """
 366    try:
 367        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
 368    except Exception as e:
 369        return d
 370
 371def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
 372    """
 373    Convert the standard pipes dictionary into a list.
 374
 375    Parameters
 376    ----------
 377    pipes_dict: PipesDict
 378        The pipes dictionary to be flattened.
 379
 380    Returns
 381    -------
 382    A list of `Pipe` objects.
 383
 384    """
 385    pipes_list = []
 386    for ck in pipes_dict.values():
 387        for mk in ck.values():
 388            pipes_list += list(mk.values())
 389    return pipes_list
 390
 391
 392def round_time(
 393        dt: Optional[datetime] = None,
 394        date_delta: Optional[timedelta] = None,
 395        to: 'str' = 'down'
 396    ) -> datetime:
 397    """
 398    Round a datetime object to a multiple of a timedelta.
 399    http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
 400
 401    NOTE: This function strips timezone information!
 402
 403    Parameters
 404    ----------
 405    dt: Optional[datetime], default None
 406        If `None`, grab the current UTC datetime.
 407
 408    date_delta: Optional[timedelta], default None
 409        If `None`, use a delta of 1 minute.
 410
 411    to: 'str', default 'down'
 412        Available options are `'up'`, `'down'`, and `'closest'`.
 413
 414    Returns
 415    -------
 416    A rounded `datetime` object.
 417
 418    Examples
 419    --------
 420    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
 421    datetime.datetime(2022, 1, 1, 12, 15)
 422    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
 423    datetime.datetime(2022, 1, 1, 12, 16)
 424    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
 425    datetime.datetime(2022, 1, 1, 12, 0)
 426    >>> round_time(
 427    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
 428    ...   timedelta(hours=1),
 429    ...   to = 'closest'
 430    ... )
 431    datetime.datetime(2022, 1, 1, 12, 0)
 432    >>> round_time(
 433    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
 434    ...   datetime.timedelta(hours=1),
 435    ...   to = 'closest'
 436    ... )
 437    datetime.datetime(2022, 1, 1, 13, 0)
 438
 439    """
 440    if date_delta is None:
 441        date_delta = timedelta(minutes=1)
 442    round_to = date_delta.total_seconds()
 443    if dt is None:
 444        dt = datetime.now(timezone.utc).replace(tzinfo=None)
 445    seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds
 446
 447    if seconds % round_to == 0 and dt.microsecond == 0:
 448        rounding = (seconds + round_to / 2) // round_to * round_to
 449    else:
 450        if to == 'up':
 451            rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to
 452        elif to == 'down':
 453            rounding = seconds // round_to * round_to
 454        else:
 455            rounding = (seconds + round_to / 2) // round_to * round_to
 456
 457    return dt + timedelta(0, rounding - seconds, - dt.microsecond)
 458
 459
 460def timed_input(
 461        seconds: int = 10,
 462        timeout_message: str = "",
 463        prompt: str = "",
 464        icon: bool = False,
 465        **kw
 466    ) -> Union[str, None]:
 467    """
 468    Accept user input only for a brief period of time.
 469
 470    Parameters
 471    ----------
 472    seconds: int, default 10
 473        The number of seconds to wait.
 474
 475    timeout_message: str, default ''
 476        The message to print after the window has elapsed.
 477
 478    prompt: str, default ''
 479        The prompt to print during the window.
 480
 481    icon: bool, default False
 482        If `True`, print the configured input icon.
 483
 484
 485    Returns
 486    -------
 487    The input string entered by the user.
 488
 489    """
 490    import signal, time
 491
 492    class TimeoutExpired(Exception):
 493        """Raise this exception when the timeout is reached."""
 494
 495    def alarm_handler(signum, frame):
 496        raise TimeoutExpired
 497
 498    # set signal handler
 499    signal.signal(signal.SIGALRM, alarm_handler)
 500    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
 501
 502    try:
 503        return input(prompt)
 504    except TimeoutExpired:
 505        return None
 506    except (EOFError, RuntimeError):
 507        try:
 508            print(prompt)
 509            time.sleep(seconds)
 510        except TimeoutExpired:
 511            return None
 512    finally:
 513        signal.alarm(0) # cancel alarm
 514
 515
 516
 517
 518
 519def replace_pipes_in_dict(
 520        pipes : Optional[PipesDict] = None,
 521        func: 'function' = str,
 522        debug: bool = False,
 523        **kw
 524    ) -> PipesDict:
 525    """
 526    Replace the Pipes in a Pipes dict with the result of another function.
 527
 528    Parameters
 529    ----------
 530    pipes: Optional[PipesDict], default None
 531        The pipes dict to be processed.
 532
 533    func: Callable[[Any], Any], default str
 534        The function to be applied to every pipe.
 535        Defaults to the string constructor.
 536
 537    debug: bool, default False
 538        Verbosity toggle.
 539    
 540
 541    Returns
 542    -------
 543    A dictionary where every pipe is replaced with the output of a function.
 544
 545    """
 546    import copy
 547    def change_dict(d : Dict[Any, Any], func : 'function') -> None:
 548        for k, v in d.items():
 549            if isinstance(v, dict):
 550                change_dict(v, func)
 551            else:
 552                d[k] = func(v)
 553
 554    if pipes is None:
 555        from meerschaum import get_pipes
 556        pipes = get_pipes(debug=debug, **kw)
 557
 558    result = copy.deepcopy(pipes)
 559    change_dict(result, func)
 560    return result
 561
 562def enforce_gevent_monkey_patch():
 563    """
 564    Check if gevent monkey patching is enabled, and if not, then apply patching.
 565    """
 566    from meerschaum.utils.packages import attempt_import
 567    import socket
 568    gevent, gevent_socket, gevent_monkey = attempt_import(
 569        'gevent', 'gevent.socket', 'gevent.monkey'
 570    )
 571    if not socket.socket is gevent_socket.socket:
 572        gevent_monkey.patch_all()
 573
 574def is_valid_email(email: str) -> Union['re.Match', None]:
 575    """
 576    Check whether a string is a valid email.
 577
 578    Parameters
 579    ----------
 580    email: str
 581        The string to be examined.
 582        
 583    Returns
 584    -------
 585    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
 586
 587    Examples
 588    --------
 589    >>> is_valid_email('foo')
 590    >>> is_valid_email('foo@foo.com')
 591    <re.Match object; span=(0, 11), match='foo@foo.com'>
 592
 593    """
 594    import re
 595    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
 596    return re.search(regex, email)
 597
 598
 599def string_width(string: str, widest: bool = True) -> int:
 600    """
 601    Calculate the width of a string, either by its widest or last line.
 602
 603    Parameters
 604    ----------
 605    string: str:
 606        The string to be examined.
 607        
 608    widest: bool, default True
 609        No longer used because `widest` is always assumed to be true.
 610
 611    Returns
 612    -------
 613    An integer for the text's visual width.
 614
 615    Examples
 616    --------
 617    >>> string_width('a')
 618    1
 619    >>> string_width('a\\nbc\\nd')
 620    2
 621
 622    """
 623    def _widest():
 624        words = string.split('\n')
 625        max_length = 0
 626        for w in words:
 627            length = len(w)
 628            if length > max_length:
 629                max_length = length
 630        return max_length
 631
 632    return _widest()
 633
 634def _pyinstaller_traverse_dir(
 635        directory: str,
 636        ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
 637        include_dotfiles: bool = False
 638    ) -> list:
 639    """
 640    Recursively traverse a directory and return a list of its contents.
 641    """
 642    import os, pathlib
 643    paths = []
 644    _directory = pathlib.Path(directory)
 645
 646    def _found_pattern(name: str):
 647        for pattern in ignore_patterns:
 648            if pattern.replace('/', os.path.sep) in str(name):
 649                return True
 650        return False
 651
 652    for root, dirs, files in os.walk(_directory):
 653        _root = str(root)[len(str(_directory.parent)):]
 654        if _root.startswith(os.path.sep):
 655            _root = _root[len(os.path.sep):]
 656        if _root.startswith('.') and not include_dotfiles:
 657            continue
 658        ### ignore certain patterns
 659        if _found_pattern(_root):
 660            continue
 661
 662        for filename in files:
 663            if filename.startswith('.') and not include_dotfiles:
 664                continue
 665            path = os.path.join(root, filename)
 666            if _found_pattern(path):
 667                continue
 668
 669            _path = str(path)[len(str(_directory.parent)):]
 670            if _path.startswith(os.path.sep):
 671                _path = _path[len(os.path.sep):]
 672            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
 673
 674            paths.append((path, _path))
 675    return paths
 676
 677
 678def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
 679    """
 680    Recursively replace passwords in a dictionary.
 681
 682    Parameters
 683    ----------
 684    d: Dict[str, Any]
 685        The dictionary to search through.
 686
 687    replace_with: str, default '*'
 688        The string to replace each character of the password with.
 689
 690    Returns
 691    -------
 692    Another dictionary where values to the keys `'password'`
 693    are replaced with `replace_with` (`'*'`).
 694
 695    Examples
 696    --------
 697    >>> replace_password({'a': 1})
 698    {'a': 1}
 699    >>> replace_password({'password': '123'})
 700    {'password': '***'}
 701    >>> replace_password({'nested': {'password': '123'}})
 702    {'nested': {'password': '***'}}
 703    >>> replace_password({'password': '123'}, replace_with='!')
 704    {'password': '!!!'}
 705
 706    """
 707    import copy
 708    _d = copy.deepcopy(d)
 709    for k, v in d.items():
 710        if isinstance(v, dict):
 711            _d[k] = replace_password(v)
 712        elif 'password' in str(k).lower():
 713            _d[k] = ''.join([replace_with for char in str(v)])
 714        elif str(k).lower() == 'uri':
 715            from meerschaum.connectors.sql import SQLConnector
 716            try:
 717                uri_params = SQLConnector.parse_uri(v)
 718            except Exception as e:
 719                uri_params = None
 720            if not uri_params:
 721                continue
 722            if not 'username' in uri_params or not 'password' in uri_params:
 723                continue
 724            _d[k] = v.replace(
 725                uri_params['username'] + ':' + uri_params['password'],
 726                uri_params['username'] + ':' + ''.join(
 727                    [replace_with for char in str(uri_params['password'])]
 728                )
 729            )
 730    return _d
 731
 732
 733def filter_arguments(
 734    func: Callable[[Any], Any],
 735    *args: Any,
 736    **kwargs: Any
 737) -> Tuple[Tuple[Any], Dict[str, Any]]:
 738    """
 739    Filter out unsupported positional and keyword arguments.
 740
 741    Parameters
 742    ----------
 743    func: Callable[[Any], Any]
 744        The function to inspect.
 745
 746    *args: Any
 747        Positional arguments to filter and pass to `func`.
 748
 749    **kwargs
 750        Keyword arguments to filter and pass to `func`.
 751
 752    Returns
 753    -------
 754    The `args` and `kwargs` accepted by `func`.
 755    """
 756    args = filter_positionals(func, *args)
 757    kwargs = filter_keywords(func, **kwargs)
 758    return args, kwargs
 759
 760
 761def filter_keywords(
 762    func: Callable[[Any], Any],
 763    **kw: Any
 764) -> Dict[str, Any]:
 765    """
 766    Filter out unsupported keyword arguments.
 767
 768    Parameters
 769    ----------
 770    func: Callable[[Any], Any]
 771        The function to inspect.
 772
 773    **kw: Any
 774        The arguments to be filtered and passed into `func`.
 775
 776    Returns
 777    -------
 778    A dictionary of keyword arguments accepted by `func`.
 779
 780    Examples
 781    --------
 782    ```python
 783    >>> def foo(a=1, b=2):
 784    ...     return a * b
 785    >>> filter_keywords(foo, a=2, b=4, c=6)
 786    {'a': 2, 'b': 4}
 787    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
 788    8
 789    ```
 790
 791    """
 792    import inspect
 793    func_params = inspect.signature(func).parameters
 794    ### If the function has a **kw method, skip filtering.
 795    for param, _type in func_params.items():
 796        if '**' in str(_type):
 797            return kw
 798    return {k: v for k, v in kw.items() if k in func_params}
 799
 800
 801def filter_positionals(
 802    func: Callable[[Any], Any],
 803    *args: Any
 804) -> Tuple[Any]:
 805    """
 806    Filter out unsupported positional arguments.
 807
 808    Parameters
 809    ----------
 810    func: Callable[[Any], Any]
 811        The function to inspect.
 812
 813    *args: Any
 814        The arguments to be filtered and passed into `func`.
 815        NOTE: If the function signature expects more arguments than provided,
 816        the missing slots will be filled with `None`.
 817
 818    Returns
 819    -------
 820    A tuple of positional arguments accepted by `func`.
 821
 822    Examples
 823    --------
 824    ```python
 825    >>> def foo(a, b):
 826    ...     return a * b
 827    >>> filter_positionals(foo, 2, 4, 6)
 828    (2, 4)
 829    >>> foo(*filter_positionals(foo, 2, 4, 6))
 830    8
 831    ```
 832
 833    """
 834    import inspect
 835    from meerschaum.utils.warnings import warn
 836    func_params = inspect.signature(func).parameters
 837    acceptable_args: List[Any] = []
 838
 839    def _warn_invalids(_num_invalid):
 840        if _num_invalid > 0:
 841            warn(
 842                "Too few arguments were provided. "
 843                + f"{_num_invalid} argument"
 844                + ('s have ' if _num_invalid != 1 else " has ")
 845                + " been filled with `None`.",
 846            )
 847
 848    num_invalid: int = 0
 849    for i, (param, val) in enumerate(func_params.items()):
 850        if '=' in str(val) or '*' in str(val):
 851            _warn_invalids(num_invalid)
 852            return tuple(acceptable_args)
 853
 854        try:
 855            acceptable_args.append(args[i])
 856        except IndexError:
 857            acceptable_args.append(None)
 858            num_invalid += 1
 859
 860    _warn_invalids(num_invalid)
 861    return tuple(acceptable_args)
 862
 863
 864def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
 865    """
 866    Convert an ordered dict to a dict.
 867    Does not mutate the original OrderedDict.
 868    """
 869    from collections import OrderedDict
 870    _d = dict(od)
 871    for k, v in od.items():
 872        if isinstance(v, OrderedDict) or (
 873            issubclass(type(v), OrderedDict)
 874        ):
 875            _d[k] = dict_from_od(v)
 876    return _d
 877
 878def remove_ansi(s : str) -> str:
 879    """
 880    Remove ANSI escape characters from a string.
 881
 882    Parameters
 883    ----------
 884    s: str:
 885        The string to be cleaned.
 886
 887    Returns
 888    -------
 889    A string with the ANSI characters removed.
 890
 891    Examples
 892    --------
 893    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
 894    'Hello, World!'
 895
 896    """
 897    import re
 898    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
 899
 900
 901def get_connector_labels(
 902        *types: str,
 903        search_term: str = '',
 904        ignore_exact_match = True,
 905    ) -> List[str]:
 906    """
 907    Read connector labels from the configuration dictionary.
 908
 909    Parameters
 910    ----------
 911    *types: str
 912        The connector types.
 913        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
 914
 915    search_term: str, default ''
 916        A filter on the connectors' labels.
 917
 918    ignore_exact_match: bool, default True
 919        If `True`, skip a connector if the search_term is an exact match.
 920
 921    Returns
 922    -------
 923    A list of the keys of defined connectors.
 924
 925    """
 926    from meerschaum.config import get_config
 927    connectors = get_config('meerschaum', 'connectors')
 928
 929    _types = list(types)
 930    if len(_types) == 0:
 931        _types = list(connectors.keys()) + ['plugin']
 932
 933    conns = []
 934    for t in _types:
 935        if t == 'plugin':
 936            from meerschaum.plugins import get_data_plugins
 937            conns += [
 938                f'{t}:' + plugin.module.__name__.split('.')[-1]
 939                for plugin in get_data_plugins()
 940            ]
 941            continue
 942        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
 943
 944    possibilities = [
 945        c for c in conns
 946            if c.startswith(search_term)
 947                and c != (
 948                    search_term if ignore_exact_match else ''
 949                )
 950    ]
 951    return sorted(possibilities)
 952
 953
 954def json_serialize_datetime(dt: datetime) -> Union[str, None]:
 955    """
 956    Serialize a datetime object into JSON (ISO format string).
 957    
 958    Examples
 959    --------
 960    >>> import json
 961    >>> from datetime import datetime
 962    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
 963    '{"a": "2022-01-01T00:00:00Z"}'
 964
 965    """
 966    if not isinstance(dt, datetime):
 967        return None
 968    tz_suffix = 'Z' if dt.tzinfo is None else ''
 969    return dt.isoformat() + tz_suffix
 970
 971
 972def wget(
 973        url: str,
 974        dest: Optional[Union[str, 'pathlib.Path']] = None,
 975        headers: Optional[Dict[str, Any]] = None,
 976        color: bool = True,
 977        debug: bool = False,
 978        **kw: Any
 979    ) -> 'pathlib.Path':
 980    """
 981    Mimic `wget` with `requests`.
 982
 983    Parameters
 984    ----------
 985    url: str
 986        The URL to the resource to be downloaded.
 987        
 988    dest: Optional[Union[str, pathlib.Path]], default None
 989        The destination path of the downloaded file.
 990        If `None`, save to the current directory.
 991
 992    color: bool, default True
 993        If `debug` is `True`, print color output.
 994
 995    debug: bool, default False
 996        Verbosity toggle.
 997
 998    Returns
 999    -------
1000    The path to the downloaded file.
1001
1002    """
1003    from meerschaum.utils.warnings import warn, error
1004    from meerschaum.utils.debug import dprint
1005    import os, pathlib, re, urllib.request
1006    if headers is None:
1007        headers = {}
1008    request = urllib.request.Request(url, headers=headers)
1009    if not color:
1010        dprint = print
1011    if debug:
1012        dprint(f"Downloading from '{url}'...")
1013    try:
1014        response = urllib.request.urlopen(request)
1015    except Exception as e:
1016        import ssl
1017        ssl._create_default_https_context = ssl._create_unverified_context
1018        try:
1019            response = urllib.request.urlopen(request)
1020        except Exception as _e:
1021            print(_e)
1022            response = None
1023    if response is None or response.code != 200:
1024        error_msg = f"Failed to download from '{url}'."
1025        if color:
1026            error(error_msg)
1027        else:
1028            print(error_msg)
1029            import sys
1030            sys.exit(1)
1031
1032    d = response.headers.get('content-disposition', None)
1033    fname = (
1034        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1035        else url.split('/')[-1]
1036    )
1037
1038    if dest is None:
1039        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1040    elif isinstance(dest, str):
1041        dest = pathlib.Path(dest)
1042
1043    with open(dest, 'wb') as f:
1044        f.write(response.fp.read())
1045
1046    if debug:
1047        dprint(f"Downloaded file '{dest}'.")
1048
1049    return dest
1050
1051
1052def async_wrap(func):
1053    """
1054    Run a synchronous function as async.
1055    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1056    """
1057    import asyncio
1058    from functools import wraps, partial
1059
1060    @wraps(func)
1061    async def run(*args, loop=None, executor=None, **kwargs):
1062        if loop is None:
1063            loop = asyncio.get_event_loop()
1064        pfunc = partial(func, *args, **kwargs)
1065        return await loop.run_in_executor(executor, pfunc)
1066    return run 
1067
1068
1069def debug_trace(browser: bool = True):
1070    """
1071    Open a web-based debugger to trace the execution of the program.
1072
1073    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1074    """
1075    from meerschaum.utils.debug import trace
1076    trace(browser=browser)
1077
1078
1079def items_str(
1080        items: List[Any],
1081        quotes: bool = True,
1082        quote_str: str = "'",
1083        commas: bool = True,
1084        comma_str: str = ',',
1085        and_: bool = True,
1086        and_str: str = 'and',
1087        oxford_comma: bool = True,
1088        spaces: bool = True,
1089        space_str = ' ',
1090    ) -> str:
1091    """
1092    Return a formatted string if list items separated by commas.
1093
1094    Parameters
1095    ----------
1096    items: [List[Any]]
1097        The items to be printed as an English list.
1098
1099    quotes: bool, default True
1100        If `True`, wrap items in quotes.
1101
1102    quote_str: str, default "'"
1103        If `quotes` is `True`, prepend and append each item with this string.
1104
1105    and_: bool, default True
1106        If `True`, include the word 'and' before the final item in the list.
1107
1108    and_str: str, default 'and'
1109        If `and_` is True, insert this string where 'and' normally would in and English list.
1110
1111    oxford_comma: bool, default True
1112        If `True`, include the Oxford Comma (comma before the final 'and').
1113        Only applies when `and_` is `True`.
1114
1115    spaces: bool, default True
1116        If `True`, separate items with `space_str`
1117
1118    space_str: str, default ' '
1119        If `spaces` is `True`, separate items with this string.
1120
1121    Returns
1122    -------
1123    A string of the items as an English list.
1124
1125    Examples
1126    --------
1127    >>> items_str([1,2,3])
1128    "'1', '2', and '3'"
1129    >>> items_str([1,2,3], quotes=False)
1130    '1, 2, and 3'
1131    >>> items_str([1,2,3], and_=False)
1132    "'1', '2', '3'"
1133    >>> items_str([1,2,3], spaces=False, and_=False)
1134    "'1','2','3'"
1135    >>> items_str([1,2,3], oxford_comma=False)
1136    "'1', '2' and '3'"
1137    >>> items_str([1,2,3], quote_str=":")
1138    ':1:, :2:, and :3:'
1139    >>> items_str([1,2,3], and_str="or")
1140    "'1', '2', or '3'"
1141    >>> items_str([1,2,3], space_str="_")
1142    "'1',_'2',_and_'3'"
1143
1144    """
1145    if not items:
1146        return ''
1147    
1148    q = quote_str if quotes else ''
1149    s = space_str if spaces else ''
1150    a = and_str if and_ else ''
1151    c = comma_str if commas else ''
1152
1153    if len(items) == 1:
1154        return q + str(list(items)[0]) + q
1155
1156    if len(items) == 2:
1157        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1158
1159    sep = q + c + s + q
1160    output = q + sep.join(str(i) for i in items[:-1]) + q
1161    if oxford_comma:
1162        output += c
1163    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1164    return output
1165
1166
1167def interval_str(delta: Union[timedelta, int]) -> str:
1168    """
1169    Return a human-readable string for a `timedelta` (or `int` minutes).
1170
1171    Parameters
1172    ----------
1173    delta: Union[timedelta, int]
1174        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1175
1176    Returns
1177    -------
1178    A formatted string, fit for human eyes.
1179    """
1180    from meerschaum.utils.packages import attempt_import
1181    humanfriendly = attempt_import('humanfriendly')
1182    delta_seconds = (
1183        delta.total_seconds()
1184        if isinstance(delta, timedelta)
1185        else (delta * 60)
1186    )
1187    return humanfriendly.format_timespan(delta_seconds)
1188
1189
1190def is_docker_available() -> bool:
1191    """Check if we can connect to the Docker engine."""
1192    import subprocess
1193    try:
1194        has_docker = subprocess.call(
1195            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1196        ) == 0
1197    except Exception as e:
1198        has_docker = False
1199    return has_docker
1200
1201
1202def is_android() -> bool:
1203    """Return `True` if the current platform is Android."""
1204    import sys
1205    return hasattr(sys, 'getandroidapilevel')
1206
1207
1208def is_bcp_available() -> bool:
1209    """Check if the MSSQL `bcp` utility is installed."""
1210    import subprocess
1211
1212    try:
1213        has_bcp = subprocess.call(
1214            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1215        ) == 0
1216    except Exception as e:
1217        has_bcp = False
1218    return has_bcp
1219
1220
1221def get_last_n_lines(file_name: str, N: int):
1222    """
1223    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1224    """
1225    import os
1226    # Create an empty list to keep the track of last N lines
1227    list_of_lines = []
1228    # Open file for reading in binary mode
1229    with open(file_name, 'rb') as read_obj:
1230        # Move the cursor to the end of the file
1231        read_obj.seek(0, os.SEEK_END)
1232        # Create a buffer to keep the last read line
1233        buffer = bytearray()
1234        # Get the current position of pointer i.e eof
1235        pointer_location = read_obj.tell()
1236        # Loop till pointer reaches the top of the file
1237        while pointer_location >= 0:
1238            # Move the file pointer to the location pointed by pointer_location
1239            read_obj.seek(pointer_location)
1240            # Shift pointer location by -1
1241            pointer_location = pointer_location -1
1242            # read that byte / character
1243            new_byte = read_obj.read(1)
1244            # If the read byte is new line character then it means one line is read
1245            if new_byte == b'\n':
1246                # Save the line in list of lines
1247                list_of_lines.append(buffer.decode()[::-1])
1248                # If the size of list reaches N, then return the reversed list
1249                if len(list_of_lines) == N:
1250                    return list(reversed(list_of_lines))
1251                # Reinitialize the byte array to save next line
1252                buffer = bytearray()
1253            else:
1254                # If last read character is not eol then add it in buffer
1255                buffer.extend(new_byte)
1256        # As file is read completely, if there is still data in buffer, then its first line.
1257        if len(buffer) > 0:
1258            list_of_lines.append(buffer.decode()[::-1])
1259    # return the reversed list
1260    return list(reversed(list_of_lines))
1261
1262
1263def tail(f, n, offset=None):
1264    """
1265    https://stackoverflow.com/a/692616/9699829
1266    
1267    Reads n lines from f with an offset of offset lines.  The return
1268    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1269    an indicator that is `True` if there are more lines in the file.
1270    """
1271    avg_line_length = 74
1272    to_read = n + (offset or 0)
1273
1274    while True:
1275        try:
1276            f.seek(-(avg_line_length * to_read), 2)
1277        except IOError:
1278            # woops.  apparently file is smaller than what we want
1279            # to step back, go to the beginning instead
1280            f.seek(0)
1281        pos = f.tell()
1282        lines = f.read().splitlines()
1283        if len(lines) >= to_read or pos == 0:
1284            return lines[-to_read:offset and -offset or None], \
1285                   len(lines) > to_read or pos > 0
1286        avg_line_length *= 1.3
1287
1288
1289def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1290    """
1291    Remove characters from each section of a string until the length is within the limit.
1292
1293    Parameters
1294    ----------
1295    item: str
1296        The item name to be truncated.
1297
1298    delimeter: str, default '_'
1299        Split `item` by this string into several sections.
1300
1301    max_len: int, default 128
1302        The max acceptable length of the truncated version of `item`.
1303
1304    Returns
1305    -------
1306    The truncated string.
1307
1308    Examples
1309    --------
1310    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1311    'ab_de_gh'
1312
1313    """
1314    if len(item) < max_len:
1315        return item
1316
1317    def _shorten(s: str) -> str:
1318        return s[:-1] if len(s) > 1 else s
1319
1320    sections = list(enumerate(item.split('_')))
1321    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1322    available_chars = max_len - len(sections)
1323
1324    _sections = [(i, s) for i, s in sorted_sections]
1325    _sections_len = sum([len(s) for i, s in _sections])
1326    _old_sections_len = _sections_len
1327    while _sections_len > available_chars:
1328        _sections = [(i, _shorten(s)) for i, s in _sections]
1329        _old_sections_len = _sections_len
1330        _sections_len = sum([len(s) for i, s in _sections])
1331        if _old_sections_len == _sections_len:
1332            raise Exception(f"String could not be truncated: '{item}'")
1333
1334    new_sections = sorted(_sections, key=lambda x: x[0])
1335    return delimeter.join([s for i, s in new_sections])
1336
1337
1338def separate_negation_values(
1339        vals: Union[List[str], Tuple[str]],
1340        negation_prefix: Optional[str] = None,
1341    ) -> Tuple[List[str], List[str]]:
1342    """
1343    Separate the negated values from the positive ones.
1344    Return two lists: positive and negative values.
1345
1346    Parameters
1347    ----------
1348    vals: Union[List[str], Tuple[str]]
1349        A list of strings to parse.
1350
1351    negation_prefix: Optional[str], default None
1352        Include values that start with this string in the second list.
1353        If `None`, use the system default (`_`).
1354    """
1355    if negation_prefix is None:
1356        from meerschaum.config.static import STATIC_CONFIG
1357        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1358    _in_vals, _ex_vals = [], []
1359    for v in vals:
1360        if str(v).startswith(negation_prefix):
1361            _ex_vals.append(str(v)[len(negation_prefix):])
1362        else:
1363            _in_vals.append(v)
1364
1365    return _in_vals, _ex_vals
1366
1367
1368def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1369    """
1370    Translate a params dictionary into lists of include- and exclude-values.
1371
1372    Parameters
1373    ----------
1374    params: Optional[Dict[str, Any]]
1375        A params query dictionary.
1376
1377    Returns
1378    -------
1379    A dictionary mapping keys to a tuple of lists for include and exclude values.
1380
1381    Examples
1382    --------
1383    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1384    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1385    """
1386    if not params:
1387        return {}
1388    return {
1389        col: separate_negation_values(
1390            (
1391                val
1392                if isinstance(val, (list, tuple))
1393                else [val]
1394            )
1395        )
1396        for col, val in params.items()
1397    }
1398
1399
1400def flatten_list(list_: List[Any]) -> List[Any]:
1401    """
1402    Recursively flatten a list.
1403    """
1404    for item in list_:
1405        if isinstance(item, list):
1406            yield from flatten_list(item)
1407        else:
1408            yield item
1409
1410
1411def make_symlink(src_path: pathlib.Path, dest_path: pathlib.Path) -> SuccessTuple:
1412    """
1413    Wrap around `pathlib.Path.symlink_to`, but add support for Windows.
1414
1415    Parameters
1416    ----------
1417    src_path: pathlib.Path
1418        The source path.
1419
1420    dest_path: pathlib.Path
1421        The destination path.
1422
1423    Returns
1424    -------
1425    A SuccessTuple indicating success.
1426    """
1427    if dest_path.exists() and dest_path.resolve() == src_path.resolve():
1428        return True, "Symlink already exists."
1429    try:
1430        dest_path.symlink_to(src_path)
1431        success = True
1432    except Exception as e:
1433        success = False
1434        msg = str(e)
1435    if success:
1436        return success, "Success"
1437    
1438    ### Failed to create a symlink.
1439    ### If we're not on Windows, return an error.
1440    import platform
1441    if platform.system() != 'Windows':
1442        return success, msg
1443
1444    try:
1445        import _winapi
1446    except ImportError:
1447        return False, "Unable to import _winapi."
1448
1449    if src_path.is_dir():
1450        try:
1451            _winapi.CreateJunction(str(src_path), str(dest_path))
1452        except Exception as e:
1453            return False, str(e)
1454        return True, "Success"
1455
1456    ### Last resort: copy the file on Windows.
1457    import shutil
1458    try:
1459        shutil.copy(src_path, dest_path)
1460    except Exception as e:
1461        return False, str(e)
1462    
1463    return True, "Success"
1464
1465
1466def is_symlink(path: pathlib.Path) -> bool:
1467    """
1468    Wrap `path.is_symlink()` but add support for Windows junctions.
1469    """
1470    if path.is_symlink():
1471        return True
1472    import platform, os
1473    if platform.system() != 'Windows':
1474        return False
1475    try:
1476        return bool(os.readlink(path))
1477    except OSError:
1478        return False
1479
1480
1481def parametrized(dec):
1482    """
1483    A meta-decorator for allowing other decorator functions to have parameters.
1484
1485    https://stackoverflow.com/a/26151604/9699829
1486    """
1487    def layer(*args, **kwargs):
1488        def repl(f):
1489            return dec(f, *args, **kwargs)
1490        return repl
1491    return layer
1492
1493
1494def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1495    """
1496    Safely extract a TAR file to a give directory.
1497    This defends against CVE-2007-4559.
1498
1499    Parameters
1500    ----------
1501    tarf: file
1502        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1503
1504    output_dir: Union[str, pathlib.Path]
1505        The output directory.
1506    """
1507    import os
1508
1509    def is_within_directory(directory, target):
1510        abs_directory = os.path.abspath(directory)
1511        abs_target = os.path.abspath(target)
1512        prefix = os.path.commonprefix([abs_directory, abs_target])
1513        return prefix == abs_directory 
1514
1515    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1516        for member in tar.getmembers():
1517            member_path = os.path.join(path, member.name)
1518            if not is_within_directory(path, member_path):
1519                raise Exception("Attempted Path Traversal in Tar File")
1520
1521        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1522
1523    return safe_extract(tarf, output_dir)
1524
1525##################
1526# Legacy imports #
1527##################
1528
1529def choose_subaction(*args, **kwargs) -> Any:
1530    """
1531    Placeholder function to prevent breaking legacy behavior.
1532    See `meerschaum.actions.choose_subaction`.
1533    """
1534    from meerschaum.actions import choose_subaction as _choose_subactions
1535    return _choose_subactions(*args, **kwargs)
1536
1537
1538def print_options(*args, **kwargs) -> None:
1539    """
1540    Placeholder function to prevent breaking legacy behavior.
1541    See `meerschaum.utils.formatting.print_options`.
1542    """
1543    from meerschaum.utils.formatting import print_options as _print_options
1544    return _print_options(*args, **kwargs)
1545
1546
1547def to_pandas_dtype(*args, **kwargs) -> Any:
1548    """
1549    Placeholder function to prevent breaking legacy behavior.
1550    See `meerschaum.utils.dtypes.to_pandas_dtype`.
1551    """
1552    from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype
1553    return _to_pandas_dtype(*args, **kwargs)
1554
1555
1556def filter_unseen_df(*args, **kwargs) -> Any:
1557    """
1558    Placeholder function to prevent breaking legacy behavior.
1559    See `meerschaum.utils.dataframe.filter_unseen_df`.
1560    """
1561    from meerschaum.utils.dataframe import filter_unseen_df as real_function
1562    return real_function(*args, **kwargs)
1563
1564
1565def add_missing_cols_to_df(*args, **kwargs) -> Any:
1566    """
1567    Placeholder function to prevent breaking legacy behavior.
1568    See `meerschaum.utils.dataframe.add_missing_cols_to_df`.
1569    """
1570    from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function
1571    return real_function(*args, **kwargs)
1572
1573
1574def parse_df_datetimes(*args, **kwargs) -> Any:
1575    """
1576    Placeholder function to prevent breaking legacy behavior.
1577    See `meerschaum.utils.dataframe.parse_df_datetimes`.
1578    """
1579    from meerschaum.utils.dataframe import parse_df_datetimes as real_function
1580    return real_function(*args, **kwargs)
1581
1582
1583def df_from_literal(*args, **kwargs) -> Any:
1584    """
1585    Placeholder function to prevent breaking legacy behavior.
1586    See `meerschaum.utils.dataframe.df_from_literal`.
1587    """
1588    from meerschaum.utils.dataframe import df_from_literal as real_function
1589    return real_function(*args, **kwargs)
1590
1591
1592def get_json_cols(*args, **kwargs) -> Any:
1593    """
1594    Placeholder function to prevent breaking legacy behavior.
1595    See `meerschaum.utils.dataframe.get_json_cols`.
1596    """
1597    from meerschaum.utils.dataframe import get_json_cols as real_function
1598    return real_function(*args, **kwargs)
1599
1600
1601def get_unhashable_cols(*args, **kwargs) -> Any:
1602    """
1603    Placeholder function to prevent breaking legacy behavior.
1604    See `meerschaum.utils.dataframe.get_unhashable_cols`.
1605    """
1606    from meerschaum.utils.dataframe import get_unhashable_cols as real_function
1607    return real_function(*args, **kwargs)
1608
1609
1610def enforce_dtypes(*args, **kwargs) -> Any:
1611    """
1612    Placeholder function to prevent breaking legacy behavior.
1613    See `meerschaum.utils.dataframe.enforce_dtypes`.
1614    """
1615    from meerschaum.utils.dataframe import enforce_dtypes as real_function
1616    return real_function(*args, **kwargs)
1617
1618
1619def get_datetime_bound_from_df(*args, **kwargs) -> Any:
1620    """
1621    Placeholder function to prevent breaking legacy behavior.
1622    See `meerschaum.utils.dataframe.get_datetime_bound_from_df`.
1623    """
1624    from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function
1625    return real_function(*args, **kwargs)
1626
1627
1628def df_is_chunk_generator(*args, **kwargs) -> Any:
1629    """
1630    Placeholder function to prevent breaking legacy behavior.
1631    See `meerschaum.utils.dataframe.df_is_chunk_generator`.
1632    """
1633    from meerschaum.utils.dataframe import df_is_chunk_generator as real_function
1634    return real_function(*args, **kwargs)
1635
1636
1637def choices_docstring(*args, **kwargs) -> Any:
1638    """
1639    Placeholder function to prevent breaking legacy behavior.
1640    See `meerschaum.actions.choices_docstring`.
1641    """
1642    from meerschaum.actions import choices_docstring as real_function
1643    return real_function(*args, **kwargs)
1644
1645
1646def _get_subaction_names(*args, **kwargs) -> Any:
1647    """
1648    Placeholder function to prevent breaking legacy behavior.
1649    See `meerschaum.actions._get_subaction_names`.
1650    """
1651    from meerschaum.actions import _get_subaction_names as real_function
1652    return real_function(*args, **kwargs)
1653
1654
1655_current_module = sys.modules[__name__]
1656__all__ = tuple(
1657    name
1658    for name, obj in globals().items()
1659    if callable(obj)
1660        and name not in __pdoc__
1661        and getattr(obj, '__module__', None) == _current_module.__name__
1662)
def add_method_to_class( func: Callable[[Any], Any], class_def: "'Class'", method_name: Optional[str] = None, keep_self: Optional[bool] = None) -> Callable[[Any], Any]:
49def add_method_to_class(
50        func: Callable[[Any], Any],
51        class_def: 'Class',
52        method_name: Optional[str] = None,
53        keep_self: Optional[bool] = None,
54    ) -> Callable[[Any], Any]:
55    """
56    Add function `func` to class `class_def`.
57
58    Parameters
59    ----------
60    func: Callable[[Any], Any]
61        Function to be added as a method of the class
62
63    class_def: Class
64        Class to be modified.
65
66    method_name: Optional[str], default None
67        New name of the method. None will use func.__name__ (default).
68
69    Returns
70    -------
71    The modified function object.
72
73    """
74    from functools import wraps
75
76    is_class = isinstance(class_def, type)
77    
78    @wraps(func)
79    def wrapper(self, *args, **kw):
80        return func(*args, **kw)
81
82    if method_name is None:
83        method_name = func.__name__
84
85    setattr(class_def, method_name, (
86            wrapper if ((is_class and keep_self is None) or keep_self is False) else func
87        )
88    )
89
90    return func

Add function func to class class_def.

Parameters
  • func (Callable[[Any], Any]): Function to be added as a method of the class
  • class_def (Class): Class to be modified.
  • method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
  • The modified function object.
def generate_password(length: int = 12) -> str:
 93def generate_password(length: int = 12) -> str:
 94    """Generate a secure password of given length.
 95
 96    Parameters
 97    ----------
 98    length : int, default 12
 99        The length of the password.
100
101    Returns
102    -------
103    A random password string.
104
105    """
106    import secrets, string
107    return ''.join((secrets.choice(string.ascii_letters) for i in range(length)))

Generate a secure password of given length.

Parameters
  • length (int, default 12): The length of the password.
Returns
  • A random password string.
def is_int(s: str) -> bool:
109def is_int(s : str) -> bool:
110    """
111    Check if string is an int.
112
113    Parameters
114    ----------
115    s: str
116        The string to be checked.
117        
118    Returns
119    -------
120    A bool indicating whether the string was able to be cast to an integer.
121
122    """
123    try:
124        float(s)
125    except Exception as e:
126        return False
127    
128    return float(s).is_integer()

Check if string is an int.

Parameters
  • s (str): The string to be checked.
Returns
  • A bool indicating whether the string was able to be cast to an integer.
def string_to_dict(params_string: str) -> Dict[str, Any]:
131def string_to_dict(
132        params_string: str
133    ) -> Dict[str, Any]:
134    """
135    Parse a string into a dictionary.
136    
137    If the string begins with '{', parse as JSON. Otherwise use simple parsing.
138
139    Parameters
140    ----------
141    params_string: str
142        The string to be parsed.
143        
144    Returns
145    -------
146    The parsed dictionary.
147
148    Examples
149    --------
150    >>> string_to_dict("a:1,b:2") 
151    {'a': 1, 'b': 2}
152    >>> string_to_dict('{"a": 1, "b": 2}')
153    {'a': 1, 'b': 2}
154
155    """
156    if params_string == "":
157        return {}
158
159    import json
160
161    ### Kind of a weird edge case.
162    ### In the generated compose file, there is some weird escaping happening,
163    ### so the string to be parsed starts and ends with a single quote.
164    if (
165        isinstance(params_string, str)
166        and len(params_string) > 4
167        and params_string[1] == "{"
168        and params_string[-2] == "}"
169    ):
170        return json.loads(params_string[1:-1])
171    if str(params_string).startswith('{'):
172        return json.loads(params_string)
173
174    import ast
175    params_dict = {}
176    for param in params_string.split(","):
177        _keys = param.split(":")
178        keys = _keys[:-1]
179        try:
180            val = ast.literal_eval(_keys[-1])
181        except Exception as e:
182            val = str(_keys[-1])
183
184        c = params_dict
185        for _k in keys[:-1]:
186            try:
187                k = ast.literal_eval(_k)
188            except Exception as e:
189                k = str(_k)
190            if k not in c:
191                c[k] = {}
192            c = c[k]
193
194        c[keys[-1]] = val
195
196    return params_dict

Parse a string into a dictionary.

If the string begins with '{', parse as JSON. Otherwise use simple parsing.

Parameters
  • params_string (str): The string to be parsed.
Returns
  • The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2") 
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
def parse_config_substitution( value: str, leading_key: str = 'MRSM', begin_key: str = '{', end_key: str = '}', delimeter: str = ':') -> List[Any]:
199def parse_config_substitution(
200        value: str,
201        leading_key: str = 'MRSM',
202        begin_key: str = '{',
203        end_key: str = '}',
204        delimeter: str = ':'
205    ) -> List[Any]:
206    """
207    Parse Meerschaum substitution syntax
208    E.g. MRSM{value1:value2} => ['value1', 'value2']
209    NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`.
210    """
211    if not value.beginswith(leading_key):
212        return value
213
214    return leading_key[len(leading_key):][len():-1].split(delimeter)

Parse Meerschaum substitution syntax E.g. MRSM{value1:value2} => ['value1', 'value2'] NOTE: Not currently used. See search_and_substitute_config in meerschaum.config._read_yaml.

def edit_file( path: Union[pathlib.Path, str], default_editor: str = 'pyvim', debug: bool = False) -> bool:
217def edit_file(
218        path: Union[pathlib.Path, str],
219        default_editor: str = 'pyvim',
220        debug: bool = False
221    ) -> bool:
222    """
223    Open a file for editing.
224    
225    Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`.
226
227    Parameters
228    ----------
229    path: Union[pathlib.Path, str]
230        The path to the file to be edited.
231        
232    default_editor: str, default 'pyvim'
233        If `$EDITOR` is not set, use this instead.
234        If `pyvim` is not installed, it will install it from PyPI.
235
236    debug: bool, default False
237        Verbosity toggle.
238
239    Returns
240    -------
241    A bool indicating the file was successfully edited.
242    """
243    import os
244    from subprocess import call
245    from meerschaum.utils.debug import dprint
246    from meerschaum.utils.packages import run_python_package, attempt_import, package_venv
247    try:
248        EDITOR = os.environ.get('EDITOR', default_editor)
249        if debug:
250            dprint(f"Opening file '{path}' with editor '{EDITOR}'...")
251        rc = call([EDITOR, path])
252    except Exception as e: ### can't open with default editors
253        if debug:
254            dprint(e)
255            dprint('Failed to open file with system editor. Falling back to pyvim...')
256        pyvim = attempt_import('pyvim', lazy=False)
257        rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug)
258    return rc == 0

Open a file for editing.

Attempt to launch the user's defined $EDITOR, otherwise use pyvim.

Parameters
  • path (Union[pathlib.Path, str]): The path to the file to be edited.
  • default_editor (str, default 'pyvim'): If $EDITOR is not set, use this instead. If pyvim is not installed, it will install it from PyPI.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating the file was successfully edited.
def is_pipe_registered( pipe: meerschaum.Pipe, pipes: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]], debug: bool = False) -> bool:
261def is_pipe_registered(
262        pipe: mrsm.Pipe,
263        pipes: PipesDict,
264        debug: bool = False
265    ) -> bool:
266    """
267    Check if a Pipe is inside the pipes dictionary.
268
269    Parameters
270    ----------
271    pipe: meerschaum.Pipe
272        The pipe to see if it's in the dictionary.
273        
274    pipes: PipesDict
275        The dictionary to search inside.
276        
277    debug: bool, default False
278        Verbosity toggle.
279
280    Returns
281    -------
282    A bool indicating whether the pipe is inside the dictionary.
283    """
284    from meerschaum.utils.debug import dprint
285    ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key
286    if debug:
287        dprint(f'{ck}, {mk}, {lk}')
288        dprint(f'{pipe}, {pipes}')
289    return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]

Check if a Pipe is inside the pipes dictionary.

Parameters
  • pipe (meerschaum.Pipe): The pipe to see if it's in the dictionary.
  • pipes (PipesDict): The dictionary to search inside.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating whether the pipe is inside the dictionary.
def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
292def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]:
293    """
294    Determine the columns and lines in the terminal.
295    If they cannot be determined, return the default values (100 columns and 120 lines).
296
297    Parameters
298    ----------
299    default_cols: int, default 100
300        If the columns cannot be determined, return this value.
301
302    default_lines: int, default 120
303        If the lines cannot be determined, return this value.
304
305    Returns
306    -------
307    A tuple if integers for the columns and lines.
308    """
309    import os
310    try:
311        size = os.get_terminal_size()
312        _cols, _lines = size.columns, size.lines
313    except Exception as e:
314        _cols, _lines = (
315            int(os.environ.get('COLUMNS', str(default_cols))),
316            int(os.environ.get('LINES', str(default_lines))),
317        )
318    return _cols, _lines

Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).

Parameters
  • default_cols (int, default 100): If the columns cannot be determined, return this value.
  • default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
  • A tuple if integers for the columns and lines.
def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
321def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None):
322    """
323    Iterate over a list in chunks.
324    https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
325
326    Parameters
327    ----------
328    iterable: Iterable[Any]
329        The iterable to iterate over in chunks.
330        
331    chunksize: int
332        The size of chunks to iterate with.
333        
334    fillvalue: Optional[Any], default None
335        If the chunks do not evenly divide into the iterable, pad the end with this value.
336
337    Returns
338    -------
339    A generator of tuples of size `chunksize`.
340
341    """
342    from itertools import zip_longest
343    args = [iter(iterable)] * chunksize
344    return zip_longest(*args, fillvalue=fillvalue)

Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks

Parameters
  • iterable (Iterable[Any]): The iterable to iterate over in chunks.
  • chunksize (int): The size of chunks to iterate with.
  • fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
  • A generator of tuples of size chunksize.
def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
346def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
347    """
348    Sort a dictionary's values and return a new dictionary.
349
350    Parameters
351    ----------
352    d: Dict[Any, Any]
353        The dictionary to be sorted.
354
355    Returns
356    -------
357    A sorted dictionary.
358
359    Examples
360    --------
361    >>> sorted_dict({'b': 1, 'a': 2})
362    {'b': 1, 'a': 2}
363    >>> sorted_dict({'b': 2, 'a': 1})
364    {'a': 1, 'b': 2}
365
366    """
367    try:
368        return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])}
369    except Exception as e:
370        return d

Sort a dictionary's values and return a new dictionary.

Parameters
  • d (Dict[Any, Any]): The dictionary to be sorted.
Returns
  • A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
def flatten_pipes_dict( pipes_dict: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]) -> 'List[Pipe]':
372def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]:
373    """
374    Convert the standard pipes dictionary into a list.
375
376    Parameters
377    ----------
378    pipes_dict: PipesDict
379        The pipes dictionary to be flattened.
380
381    Returns
382    -------
383    A list of `Pipe` objects.
384
385    """
386    pipes_list = []
387    for ck in pipes_dict.values():
388        for mk in ck.values():
389            pipes_list += list(mk.values())
390    return pipes_list

Convert the standard pipes dictionary into a list.

Parameters
  • pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
  • A list of Pipe objects.
def round_time( dt: Optional[datetime.datetime] = None, date_delta: Optional[datetime.timedelta] = None, to: str = 'down') -> datetime.datetime:
393def round_time(
394        dt: Optional[datetime] = None,
395        date_delta: Optional[timedelta] = None,
396        to: 'str' = 'down'
397    ) -> datetime:
398    """
399    Round a datetime object to a multiple of a timedelta.
400    http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
401
402    NOTE: This function strips timezone information!
403
404    Parameters
405    ----------
406    dt: Optional[datetime], default None
407        If `None`, grab the current UTC datetime.
408
409    date_delta: Optional[timedelta], default None
410        If `None`, use a delta of 1 minute.
411
412    to: 'str', default 'down'
413        Available options are `'up'`, `'down'`, and `'closest'`.
414
415    Returns
416    -------
417    A rounded `datetime` object.
418
419    Examples
420    --------
421    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
422    datetime.datetime(2022, 1, 1, 12, 15)
423    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
424    datetime.datetime(2022, 1, 1, 12, 16)
425    >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
426    datetime.datetime(2022, 1, 1, 12, 0)
427    >>> round_time(
428    ...   datetime(2022, 1, 1, 12, 15, 57, 200),
429    ...   timedelta(hours=1),
430    ...   to = 'closest'
431    ... )
432    datetime.datetime(2022, 1, 1, 12, 0)
433    >>> round_time(
434    ...   datetime(2022, 1, 1, 12, 45, 57, 200),
435    ...   datetime.timedelta(hours=1),
436    ...   to = 'closest'
437    ... )
438    datetime.datetime(2022, 1, 1, 13, 0)
439
440    """
441    if date_delta is None:
442        date_delta = timedelta(minutes=1)
443    round_to = date_delta.total_seconds()
444    if dt is None:
445        dt = datetime.now(timezone.utc).replace(tzinfo=None)
446    seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds
447
448    if seconds % round_to == 0 and dt.microsecond == 0:
449        rounding = (seconds + round_to / 2) // round_to * round_to
450    else:
451        if to == 'up':
452            rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to
453        elif to == 'down':
454            rounding = seconds // round_to * round_to
455        else:
456            rounding = (seconds + round_to / 2) // round_to * round_to
457
458    return dt + timedelta(0, rounding - seconds, - dt.microsecond)

Round a datetime object to a multiple of a timedelta. http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python

NOTE: This function strips timezone information!

Parameters
  • dt (Optional[datetime], default None): If None, grab the current UTC datetime.
  • date_delta (Optional[timedelta], default None): If None, use a delta of 1 minute.
  • to ('str', default 'down'): Available options are 'up', 'down', and 'closest'.
Returns
  • A rounded datetime object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 15, 57, 200),
...   timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
...   datetime(2022, 1, 1, 12, 45, 57, 200),
...   datetime.timedelta(hours=1),
...   to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)
def timed_input( seconds: int = 10, timeout_message: str = '', prompt: str = '', icon: bool = False, **kw) -> Optional[str]:
461def timed_input(
462        seconds: int = 10,
463        timeout_message: str = "",
464        prompt: str = "",
465        icon: bool = False,
466        **kw
467    ) -> Union[str, None]:
468    """
469    Accept user input only for a brief period of time.
470
471    Parameters
472    ----------
473    seconds: int, default 10
474        The number of seconds to wait.
475
476    timeout_message: str, default ''
477        The message to print after the window has elapsed.
478
479    prompt: str, default ''
480        The prompt to print during the window.
481
482    icon: bool, default False
483        If `True`, print the configured input icon.
484
485
486    Returns
487    -------
488    The input string entered by the user.
489
490    """
491    import signal, time
492
493    class TimeoutExpired(Exception):
494        """Raise this exception when the timeout is reached."""
495
496    def alarm_handler(signum, frame):
497        raise TimeoutExpired
498
499    # set signal handler
500    signal.signal(signal.SIGALRM, alarm_handler)
501    signal.alarm(seconds) # produce SIGALRM in `timeout` seconds
502
503    try:
504        return input(prompt)
505    except TimeoutExpired:
506        return None
507    except (EOFError, RuntimeError):
508        try:
509            print(prompt)
510            time.sleep(seconds)
511        except TimeoutExpired:
512            return None
513    finally:
514        signal.alarm(0) # cancel alarm

Accept user input only for a brief period of time.

Parameters
  • seconds (int, default 10): The number of seconds to wait.
  • timeout_message (str, default ''): The message to print after the window has elapsed.
  • prompt (str, default ''): The prompt to print during the window.
  • icon (bool, default False): If True, print the configured input icon.
Returns
  • The input string entered by the user.
def replace_pipes_in_dict( pipes: Optional[Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]] = None, func: "'function'" = <class 'str'>, debug: bool = False, **kw) -> Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]:
520def replace_pipes_in_dict(
521        pipes : Optional[PipesDict] = None,
522        func: 'function' = str,
523        debug: bool = False,
524        **kw
525    ) -> PipesDict:
526    """
527    Replace the Pipes in a Pipes dict with the result of another function.
528
529    Parameters
530    ----------
531    pipes: Optional[PipesDict], default None
532        The pipes dict to be processed.
533
534    func: Callable[[Any], Any], default str
535        The function to be applied to every pipe.
536        Defaults to the string constructor.
537
538    debug: bool, default False
539        Verbosity toggle.
540    
541
542    Returns
543    -------
544    A dictionary where every pipe is replaced with the output of a function.
545
546    """
547    import copy
548    def change_dict(d : Dict[Any, Any], func : 'function') -> None:
549        for k, v in d.items():
550            if isinstance(v, dict):
551                change_dict(v, func)
552            else:
553                d[k] = func(v)
554
555    if pipes is None:
556        from meerschaum import get_pipes
557        pipes = get_pipes(debug=debug, **kw)
558
559    result = copy.deepcopy(pipes)
560    change_dict(result, func)
561    return result

Replace the Pipes in a Pipes dict with the result of another function.

Parameters
  • pipes (Optional[PipesDict], default None): The pipes dict to be processed.
  • func (Callable[[Any], Any], default str): The function to be applied to every pipe. Defaults to the string constructor.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A dictionary where every pipe is replaced with the output of a function.
def enforce_gevent_monkey_patch():
563def enforce_gevent_monkey_patch():
564    """
565    Check if gevent monkey patching is enabled, and if not, then apply patching.
566    """
567    from meerschaum.utils.packages import attempt_import
568    import socket
569    gevent, gevent_socket, gevent_monkey = attempt_import(
570        'gevent', 'gevent.socket', 'gevent.monkey'
571    )
572    if not socket.socket is gevent_socket.socket:
573        gevent_monkey.patch_all()

Check if gevent monkey patching is enabled, and if not, then apply patching.

def is_valid_email(email: str) -> Optional[re.Match]:
575def is_valid_email(email: str) -> Union['re.Match', None]:
576    """
577    Check whether a string is a valid email.
578
579    Parameters
580    ----------
581    email: str
582        The string to be examined.
583        
584    Returns
585    -------
586    None if a string is not in email format, otherwise a `re.Match` object, which is truthy.
587
588    Examples
589    --------
590    >>> is_valid_email('foo')
591    >>> is_valid_email('foo@foo.com')
592    <re.Match object; span=(0, 11), match='foo@foo.com'>
593
594    """
595    import re
596    regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
597    return re.search(regex, email)

Check whether a string is a valid email.

Parameters
  • email (str): The string to be examined.
Returns
  • None if a string is not in email format, otherwise a re.Match object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
def string_width(string: str, widest: bool = True) -> int:
600def string_width(string: str, widest: bool = True) -> int:
601    """
602    Calculate the width of a string, either by its widest or last line.
603
604    Parameters
605    ----------
606    string: str:
607        The string to be examined.
608        
609    widest: bool, default True
610        No longer used because `widest` is always assumed to be true.
611
612    Returns
613    -------
614    An integer for the text's visual width.
615
616    Examples
617    --------
618    >>> string_width('a')
619    1
620    >>> string_width('a\\nbc\\nd')
621    2
622
623    """
624    def _widest():
625        words = string.split('\n')
626        max_length = 0
627        for w in words:
628            length = len(w)
629            if length > max_length:
630                max_length = length
631        return max_length
632
633    return _widest()

Calculate the width of a string, either by its widest or last line.

Parameters
  • string (str:): The string to be examined.
  • widest (bool, default True): No longer used because widest is always assumed to be true.
Returns
  • An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
def _pyinstaller_traverse_dir( directory: str, ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), include_dotfiles: bool = False) -> list:
635def _pyinstaller_traverse_dir(
636        directory: str,
637        ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'),
638        include_dotfiles: bool = False
639    ) -> list:
640    """
641    Recursively traverse a directory and return a list of its contents.
642    """
643    import os, pathlib
644    paths = []
645    _directory = pathlib.Path(directory)
646
647    def _found_pattern(name: str):
648        for pattern in ignore_patterns:
649            if pattern.replace('/', os.path.sep) in str(name):
650                return True
651        return False
652
653    for root, dirs, files in os.walk(_directory):
654        _root = str(root)[len(str(_directory.parent)):]
655        if _root.startswith(os.path.sep):
656            _root = _root[len(os.path.sep):]
657        if _root.startswith('.') and not include_dotfiles:
658            continue
659        ### ignore certain patterns
660        if _found_pattern(_root):
661            continue
662
663        for filename in files:
664            if filename.startswith('.') and not include_dotfiles:
665                continue
666            path = os.path.join(root, filename)
667            if _found_pattern(path):
668                continue
669
670            _path = str(path)[len(str(_directory.parent)):]
671            if _path.startswith(os.path.sep):
672                _path = _path[len(os.path.sep):]
673            _path = os.path.sep.join(_path.split(os.path.sep)[:-1])
674
675            paths.append((path, _path))
676    return paths

Recursively traverse a directory and return a list of its contents.

def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
679def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]:
680    """
681    Recursively replace passwords in a dictionary.
682
683    Parameters
684    ----------
685    d: Dict[str, Any]
686        The dictionary to search through.
687
688    replace_with: str, default '*'
689        The string to replace each character of the password with.
690
691    Returns
692    -------
693    Another dictionary where values to the keys `'password'`
694    are replaced with `replace_with` (`'*'`).
695
696    Examples
697    --------
698    >>> replace_password({'a': 1})
699    {'a': 1}
700    >>> replace_password({'password': '123'})
701    {'password': '***'}
702    >>> replace_password({'nested': {'password': '123'}})
703    {'nested': {'password': '***'}}
704    >>> replace_password({'password': '123'}, replace_with='!')
705    {'password': '!!!'}
706
707    """
708    import copy
709    _d = copy.deepcopy(d)
710    for k, v in d.items():
711        if isinstance(v, dict):
712            _d[k] = replace_password(v)
713        elif 'password' in str(k).lower():
714            _d[k] = ''.join([replace_with for char in str(v)])
715        elif str(k).lower() == 'uri':
716            from meerschaum.connectors.sql import SQLConnector
717            try:
718                uri_params = SQLConnector.parse_uri(v)
719            except Exception as e:
720                uri_params = None
721            if not uri_params:
722                continue
723            if not 'username' in uri_params or not 'password' in uri_params:
724                continue
725            _d[k] = v.replace(
726                uri_params['username'] + ':' + uri_params['password'],
727                uri_params['username'] + ':' + ''.join(
728                    [replace_with for char in str(uri_params['password'])]
729                )
730            )
731    return _d

Recursively replace passwords in a dictionary.

Parameters
  • d (Dict[str, Any]): The dictionary to search through.
  • replace_with (str, default '*'): The string to replace each character of the password with.
Returns
  • Another dictionary where values to the keys 'password'
  • are replaced with replace_with ('*').
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
def filter_arguments( func: Callable[[Any], Any], *args: Any, **kwargs: Any) -> Tuple[Tuple[Any], Dict[str, Any]]:
734def filter_arguments(
735    func: Callable[[Any], Any],
736    *args: Any,
737    **kwargs: Any
738) -> Tuple[Tuple[Any], Dict[str, Any]]:
739    """
740    Filter out unsupported positional and keyword arguments.
741
742    Parameters
743    ----------
744    func: Callable[[Any], Any]
745        The function to inspect.
746
747    *args: Any
748        Positional arguments to filter and pass to `func`.
749
750    **kwargs
751        Keyword arguments to filter and pass to `func`.
752
753    Returns
754    -------
755    The `args` and `kwargs` accepted by `func`.
756    """
757    args = filter_positionals(func, *args)
758    kwargs = filter_keywords(func, **kwargs)
759    return args, kwargs

Filter out unsupported positional and keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): Positional arguments to filter and pass to func.
  • **kwargs: Keyword arguments to filter and pass to func.
Returns
  • The args and kwargs accepted by func.
def filter_keywords(func: Callable[[Any], Any], **kw: Any) -> Dict[str, Any]:
762def filter_keywords(
763    func: Callable[[Any], Any],
764    **kw: Any
765) -> Dict[str, Any]:
766    """
767    Filter out unsupported keyword arguments.
768
769    Parameters
770    ----------
771    func: Callable[[Any], Any]
772        The function to inspect.
773
774    **kw: Any
775        The arguments to be filtered and passed into `func`.
776
777    Returns
778    -------
779    A dictionary of keyword arguments accepted by `func`.
780
781    Examples
782    --------
783    ```python
784    >>> def foo(a=1, b=2):
785    ...     return a * b
786    >>> filter_keywords(foo, a=2, b=4, c=6)
787    {'a': 2, 'b': 4}
788    >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
789    8
790    ```
791
792    """
793    import inspect
794    func_params = inspect.signature(func).parameters
795    ### If the function has a **kw method, skip filtering.
796    for param, _type in func_params.items():
797        if '**' in str(_type):
798            return kw
799    return {k: v for k, v in kw.items() if k in func_params}

Filter out unsupported keyword arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • **kw (Any): The arguments to be filtered and passed into func.
Returns
  • A dictionary of keyword arguments accepted by func.
Examples
>>> def foo(a=1, b=2):
...     return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
def filter_positionals(func: Callable[[Any], Any], *args: Any) -> Tuple[Any]:
802def filter_positionals(
803    func: Callable[[Any], Any],
804    *args: Any
805) -> Tuple[Any]:
806    """
807    Filter out unsupported positional arguments.
808
809    Parameters
810    ----------
811    func: Callable[[Any], Any]
812        The function to inspect.
813
814    *args: Any
815        The arguments to be filtered and passed into `func`.
816        NOTE: If the function signature expects more arguments than provided,
817        the missing slots will be filled with `None`.
818
819    Returns
820    -------
821    A tuple of positional arguments accepted by `func`.
822
823    Examples
824    --------
825    ```python
826    >>> def foo(a, b):
827    ...     return a * b
828    >>> filter_positionals(foo, 2, 4, 6)
829    (2, 4)
830    >>> foo(*filter_positionals(foo, 2, 4, 6))
831    8
832    ```
833
834    """
835    import inspect
836    from meerschaum.utils.warnings import warn
837    func_params = inspect.signature(func).parameters
838    acceptable_args: List[Any] = []
839
840    def _warn_invalids(_num_invalid):
841        if _num_invalid > 0:
842            warn(
843                "Too few arguments were provided. "
844                + f"{_num_invalid} argument"
845                + ('s have ' if _num_invalid != 1 else " has ")
846                + " been filled with `None`.",
847            )
848
849    num_invalid: int = 0
850    for i, (param, val) in enumerate(func_params.items()):
851        if '=' in str(val) or '*' in str(val):
852            _warn_invalids(num_invalid)
853            return tuple(acceptable_args)
854
855        try:
856            acceptable_args.append(args[i])
857        except IndexError:
858            acceptable_args.append(None)
859            num_invalid += 1
860
861    _warn_invalids(num_invalid)
862    return tuple(acceptable_args)

Filter out unsupported positional arguments.

Parameters
  • func (Callable[[Any], Any]): The function to inspect.
  • *args (Any): The arguments to be filtered and passed into func. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled with None.
Returns
  • A tuple of positional arguments accepted by func.
Examples
>>> def foo(a, b):
...     return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
865def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]:
866    """
867    Convert an ordered dict to a dict.
868    Does not mutate the original OrderedDict.
869    """
870    from collections import OrderedDict
871    _d = dict(od)
872    for k, v in od.items():
873        if isinstance(v, OrderedDict) or (
874            issubclass(type(v), OrderedDict)
875        ):
876            _d[k] = dict_from_od(v)
877    return _d

Convert an ordered dict to a dict. Does not mutate the original OrderedDict.

def remove_ansi(s: str) -> str:
879def remove_ansi(s : str) -> str:
880    """
881    Remove ANSI escape characters from a string.
882
883    Parameters
884    ----------
885    s: str:
886        The string to be cleaned.
887
888    Returns
889    -------
890    A string with the ANSI characters removed.
891
892    Examples
893    --------
894    >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m")
895    'Hello, World!'
896
897    """
898    import re
899    return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)

Remove ANSI escape characters from a string.

Parameters
  • s (str:): The string to be cleaned.
Returns
  • A string with the ANSI characters removed.
Examples
>>> remove_ansi("Hello, World!")
'Hello, World!'
def get_connector_labels(*types: str, search_term: str = '', ignore_exact_match=True) -> List[str]:
902def get_connector_labels(
903        *types: str,
904        search_term: str = '',
905        ignore_exact_match = True,
906    ) -> List[str]:
907    """
908    Read connector labels from the configuration dictionary.
909
910    Parameters
911    ----------
912    *types: str
913        The connector types.
914        If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`.
915
916    search_term: str, default ''
917        A filter on the connectors' labels.
918
919    ignore_exact_match: bool, default True
920        If `True`, skip a connector if the search_term is an exact match.
921
922    Returns
923    -------
924    A list of the keys of defined connectors.
925
926    """
927    from meerschaum.config import get_config
928    connectors = get_config('meerschaum', 'connectors')
929
930    _types = list(types)
931    if len(_types) == 0:
932        _types = list(connectors.keys()) + ['plugin']
933
934    conns = []
935    for t in _types:
936        if t == 'plugin':
937            from meerschaum.plugins import get_data_plugins
938            conns += [
939                f'{t}:' + plugin.module.__name__.split('.')[-1]
940                for plugin in get_data_plugins()
941            ]
942            continue
943        conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ]
944
945    possibilities = [
946        c for c in conns
947            if c.startswith(search_term)
948                and c != (
949                    search_term if ignore_exact_match else ''
950                )
951    ]
952    return sorted(possibilities)

Read connector labels from the configuration dictionary.

Parameters
  • *types (str): The connector types. If none are provided, use the defined types ('sql' and 'api') and 'plugin'.
  • search_term (str, default ''): A filter on the connectors' labels.
  • ignore_exact_match (bool, default True): If True, skip a connector if the search_term is an exact match.
Returns
  • A list of the keys of defined connectors.
def json_serialize_datetime(dt: datetime.datetime) -> Optional[str]:
955def json_serialize_datetime(dt: datetime) -> Union[str, None]:
956    """
957    Serialize a datetime object into JSON (ISO format string).
958    
959    Examples
960    --------
961    >>> import json
962    >>> from datetime import datetime
963    >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
964    '{"a": "2022-01-01T00:00:00Z"}'
965
966    """
967    if not isinstance(dt, datetime):
968        return None
969    tz_suffix = 'Z' if dt.tzinfo is None else ''
970    return dt.isoformat() + tz_suffix

Serialize a datetime object into JSON (ISO format string).

Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
def wget( url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, color: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
 973def wget(
 974        url: str,
 975        dest: Optional[Union[str, 'pathlib.Path']] = None,
 976        headers: Optional[Dict[str, Any]] = None,
 977        color: bool = True,
 978        debug: bool = False,
 979        **kw: Any
 980    ) -> 'pathlib.Path':
 981    """
 982    Mimic `wget` with `requests`.
 983
 984    Parameters
 985    ----------
 986    url: str
 987        The URL to the resource to be downloaded.
 988        
 989    dest: Optional[Union[str, pathlib.Path]], default None
 990        The destination path of the downloaded file.
 991        If `None`, save to the current directory.
 992
 993    color: bool, default True
 994        If `debug` is `True`, print color output.
 995
 996    debug: bool, default False
 997        Verbosity toggle.
 998
 999    Returns
1000    -------
1001    The path to the downloaded file.
1002
1003    """
1004    from meerschaum.utils.warnings import warn, error
1005    from meerschaum.utils.debug import dprint
1006    import os, pathlib, re, urllib.request
1007    if headers is None:
1008        headers = {}
1009    request = urllib.request.Request(url, headers=headers)
1010    if not color:
1011        dprint = print
1012    if debug:
1013        dprint(f"Downloading from '{url}'...")
1014    try:
1015        response = urllib.request.urlopen(request)
1016    except Exception as e:
1017        import ssl
1018        ssl._create_default_https_context = ssl._create_unverified_context
1019        try:
1020            response = urllib.request.urlopen(request)
1021        except Exception as _e:
1022            print(_e)
1023            response = None
1024    if response is None or response.code != 200:
1025        error_msg = f"Failed to download from '{url}'."
1026        if color:
1027            error(error_msg)
1028        else:
1029            print(error_msg)
1030            import sys
1031            sys.exit(1)
1032
1033    d = response.headers.get('content-disposition', None)
1034    fname = (
1035        re.findall("filename=(.+)", d)[0].strip('"') if d is not None
1036        else url.split('/')[-1]
1037    )
1038
1039    if dest is None:
1040        dest = pathlib.Path(os.path.join(os.getcwd(), fname))
1041    elif isinstance(dest, str):
1042        dest = pathlib.Path(dest)
1043
1044    with open(dest, 'wb') as f:
1045        f.write(response.fp.read())
1046
1047    if debug:
1048        dprint(f"Downloaded file '{dest}'.")
1049
1050    return dest

Mimic wget with requests.

Parameters
  • url (str): The URL to the resource to be downloaded.
  • dest (Optional[Union[str, pathlib.Path]], default None): The destination path of the downloaded file. If None, save to the current directory.
  • color (bool, default True): If debug is True, print color output.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The path to the downloaded file.
def async_wrap(func):
1053def async_wrap(func):
1054    """
1055    Run a synchronous function as async.
1056    https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1057    """
1058    import asyncio
1059    from functools import wraps, partial
1060
1061    @wraps(func)
1062    async def run(*args, loop=None, executor=None, **kwargs):
1063        if loop is None:
1064            loop = asyncio.get_event_loop()
1065        pfunc = partial(func, *args, **kwargs)
1066        return await loop.run_in_executor(executor, pfunc)
1067    return run 
def debug_trace(browser: bool = True):
1070def debug_trace(browser: bool = True):
1071    """
1072    Open a web-based debugger to trace the execution of the program.
1073
1074    This is an alias import for `meerschaum.utils.debug.debug_trace`.
1075    """
1076    from meerschaum.utils.debug import trace
1077    trace(browser=browser)

Open a web-based debugger to trace the execution of the program.

This is an alias import for debug_trace.

def items_str( items: List[Any], quotes: bool = True, quote_str: str = "'", commas: bool = True, comma_str: str = ',', and_: bool = True, and_str: str = 'and', oxford_comma: bool = True, spaces: bool = True, space_str=' ') -> str:
1080def items_str(
1081        items: List[Any],
1082        quotes: bool = True,
1083        quote_str: str = "'",
1084        commas: bool = True,
1085        comma_str: str = ',',
1086        and_: bool = True,
1087        and_str: str = 'and',
1088        oxford_comma: bool = True,
1089        spaces: bool = True,
1090        space_str = ' ',
1091    ) -> str:
1092    """
1093    Return a formatted string if list items separated by commas.
1094
1095    Parameters
1096    ----------
1097    items: [List[Any]]
1098        The items to be printed as an English list.
1099
1100    quotes: bool, default True
1101        If `True`, wrap items in quotes.
1102
1103    quote_str: str, default "'"
1104        If `quotes` is `True`, prepend and append each item with this string.
1105
1106    and_: bool, default True
1107        If `True`, include the word 'and' before the final item in the list.
1108
1109    and_str: str, default 'and'
1110        If `and_` is True, insert this string where 'and' normally would in and English list.
1111
1112    oxford_comma: bool, default True
1113        If `True`, include the Oxford Comma (comma before the final 'and').
1114        Only applies when `and_` is `True`.
1115
1116    spaces: bool, default True
1117        If `True`, separate items with `space_str`
1118
1119    space_str: str, default ' '
1120        If `spaces` is `True`, separate items with this string.
1121
1122    Returns
1123    -------
1124    A string of the items as an English list.
1125
1126    Examples
1127    --------
1128    >>> items_str([1,2,3])
1129    "'1', '2', and '3'"
1130    >>> items_str([1,2,3], quotes=False)
1131    '1, 2, and 3'
1132    >>> items_str([1,2,3], and_=False)
1133    "'1', '2', '3'"
1134    >>> items_str([1,2,3], spaces=False, and_=False)
1135    "'1','2','3'"
1136    >>> items_str([1,2,3], oxford_comma=False)
1137    "'1', '2' and '3'"
1138    >>> items_str([1,2,3], quote_str=":")
1139    ':1:, :2:, and :3:'
1140    >>> items_str([1,2,3], and_str="or")
1141    "'1', '2', or '3'"
1142    >>> items_str([1,2,3], space_str="_")
1143    "'1',_'2',_and_'3'"
1144
1145    """
1146    if not items:
1147        return ''
1148    
1149    q = quote_str if quotes else ''
1150    s = space_str if spaces else ''
1151    a = and_str if and_ else ''
1152    c = comma_str if commas else ''
1153
1154    if len(items) == 1:
1155        return q + str(list(items)[0]) + q
1156
1157    if len(items) == 2:
1158        return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q
1159
1160    sep = q + c + s + q
1161    output = q + sep.join(str(i) for i in items[:-1]) + q
1162    if oxford_comma:
1163        output += c
1164    output += s + a + (s if and_ else '') + q + str(items[-1]) + q
1165    return output

Return a formatted string if list items separated by commas.

Parameters
  • items ([List[Any]]): The items to be printed as an English list.
  • quotes (bool, default True): If True, wrap items in quotes.
  • quote_str (str, default "'"): If quotes is True, prepend and append each item with this string.
  • and_ (bool, default True): If True, include the word 'and' before the final item in the list.
  • and_str (str, default 'and'): If and_ is True, insert this string where 'and' normally would in and English list.
  • oxford_comma (bool, default True): If True, include the Oxford Comma (comma before the final 'and'). Only applies when and_ is True.
  • spaces (bool, default True): If True, separate items with space_str
  • space_str (str, default ' '): If spaces is True, separate items with this string.
Returns
  • A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
def interval_str(delta: Union[datetime.timedelta, int]) -> str:
1168def interval_str(delta: Union[timedelta, int]) -> str:
1169    """
1170    Return a human-readable string for a `timedelta` (or `int` minutes).
1171
1172    Parameters
1173    ----------
1174    delta: Union[timedelta, int]
1175        The interval to print. If `delta` is an integer, assume it corresponds to minutes.
1176
1177    Returns
1178    -------
1179    A formatted string, fit for human eyes.
1180    """
1181    from meerschaum.utils.packages import attempt_import
1182    humanfriendly = attempt_import('humanfriendly')
1183    delta_seconds = (
1184        delta.total_seconds()
1185        if isinstance(delta, timedelta)
1186        else (delta * 60)
1187    )
1188    return humanfriendly.format_timespan(delta_seconds)

Return a human-readable string for a timedelta (or int minutes).

Parameters
  • delta (Union[timedelta, int]): The interval to print. If delta is an integer, assume it corresponds to minutes.
Returns
  • A formatted string, fit for human eyes.
def is_docker_available() -> bool:
1191def is_docker_available() -> bool:
1192    """Check if we can connect to the Docker engine."""
1193    import subprocess
1194    try:
1195        has_docker = subprocess.call(
1196            ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1197        ) == 0
1198    except Exception as e:
1199        has_docker = False
1200    return has_docker

Check if we can connect to the Docker engine.

def is_android() -> bool:
1203def is_android() -> bool:
1204    """Return `True` if the current platform is Android."""
1205    import sys
1206    return hasattr(sys, 'getandroidapilevel')

Return True if the current platform is Android.

def is_bcp_available() -> bool:
1209def is_bcp_available() -> bool:
1210    """Check if the MSSQL `bcp` utility is installed."""
1211    import subprocess
1212
1213    try:
1214        has_bcp = subprocess.call(
1215            ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
1216        ) == 0
1217    except Exception as e:
1218        has_bcp = False
1219    return has_bcp

Check if the MSSQL bcp utility is installed.

def get_last_n_lines(file_name: str, N: int):
1222def get_last_n_lines(file_name: str, N: int):
1223    """
1224    https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/
1225    """
1226    import os
1227    # Create an empty list to keep the track of last N lines
1228    list_of_lines = []
1229    # Open file for reading in binary mode
1230    with open(file_name, 'rb') as read_obj:
1231        # Move the cursor to the end of the file
1232        read_obj.seek(0, os.SEEK_END)
1233        # Create a buffer to keep the last read line
1234        buffer = bytearray()
1235        # Get the current position of pointer i.e eof
1236        pointer_location = read_obj.tell()
1237        # Loop till pointer reaches the top of the file
1238        while pointer_location >= 0:
1239            # Move the file pointer to the location pointed by pointer_location
1240            read_obj.seek(pointer_location)
1241            # Shift pointer location by -1
1242            pointer_location = pointer_location -1
1243            # read that byte / character
1244            new_byte = read_obj.read(1)
1245            # If the read byte is new line character then it means one line is read
1246            if new_byte == b'\n':
1247                # Save the line in list of lines
1248                list_of_lines.append(buffer.decode()[::-1])
1249                # If the size of list reaches N, then return the reversed list
1250                if len(list_of_lines) == N:
1251                    return list(reversed(list_of_lines))
1252                # Reinitialize the byte array to save next line
1253                buffer = bytearray()
1254            else:
1255                # If last read character is not eol then add it in buffer
1256                buffer.extend(new_byte)
1257        # As file is read completely, if there is still data in buffer, then its first line.
1258        if len(buffer) > 0:
1259            list_of_lines.append(buffer.decode()[::-1])
1260    # return the reversed list
1261    return list(reversed(list_of_lines))
def tail(f, n, offset=None):
1264def tail(f, n, offset=None):
1265    """
1266    https://stackoverflow.com/a/692616/9699829
1267    
1268    Reads n lines from f with an offset of offset lines.  The return
1269    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
1270    an indicator that is `True` if there are more lines in the file.
1271    """
1272    avg_line_length = 74
1273    to_read = n + (offset or 0)
1274
1275    while True:
1276        try:
1277            f.seek(-(avg_line_length * to_read), 2)
1278        except IOError:
1279            # woops.  apparently file is smaller than what we want
1280            # to step back, go to the beginning instead
1281            f.seek(0)
1282        pos = f.tell()
1283        lines = f.read().splitlines()
1284        if len(lines) >= to_read or pos == 0:
1285            return lines[-to_read:offset and -offset or None], \
1286                   len(lines) > to_read or pos > 0
1287        avg_line_length *= 1.3

https://stackoverflow.com/a/692616/9699829

Reads n lines from f with an offset of offset lines. The return value is a tuple in the form (lines, has_more) where has_more is an indicator that is True if there are more lines in the file.

def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1290def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str:
1291    """
1292    Remove characters from each section of a string until the length is within the limit.
1293
1294    Parameters
1295    ----------
1296    item: str
1297        The item name to be truncated.
1298
1299    delimeter: str, default '_'
1300        Split `item` by this string into several sections.
1301
1302    max_len: int, default 128
1303        The max acceptable length of the truncated version of `item`.
1304
1305    Returns
1306    -------
1307    The truncated string.
1308
1309    Examples
1310    --------
1311    >>> truncate_string_sections('abc_def_ghi', max_len=10)
1312    'ab_de_gh'
1313
1314    """
1315    if len(item) < max_len:
1316        return item
1317
1318    def _shorten(s: str) -> str:
1319        return s[:-1] if len(s) > 1 else s
1320
1321    sections = list(enumerate(item.split('_')))
1322    sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1])))
1323    available_chars = max_len - len(sections)
1324
1325    _sections = [(i, s) for i, s in sorted_sections]
1326    _sections_len = sum([len(s) for i, s in _sections])
1327    _old_sections_len = _sections_len
1328    while _sections_len > available_chars:
1329        _sections = [(i, _shorten(s)) for i, s in _sections]
1330        _old_sections_len = _sections_len
1331        _sections_len = sum([len(s) for i, s in _sections])
1332        if _old_sections_len == _sections_len:
1333            raise Exception(f"String could not be truncated: '{item}'")
1334
1335    new_sections = sorted(_sections, key=lambda x: x[0])
1336    return delimeter.join([s for i, s in new_sections])

Remove characters from each section of a string until the length is within the limit.

Parameters
  • item (str): The item name to be truncated.
  • delimeter (str, default '_'): Split item by this string into several sections.
  • max_len (int, default 128): The max acceptable length of the truncated version of item.
Returns
  • The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
def separate_negation_values( vals: Union[List[str], Tuple[str]], negation_prefix: Optional[str] = None) -> Tuple[List[str], List[str]]:
1339def separate_negation_values(
1340        vals: Union[List[str], Tuple[str]],
1341        negation_prefix: Optional[str] = None,
1342    ) -> Tuple[List[str], List[str]]:
1343    """
1344    Separate the negated values from the positive ones.
1345    Return two lists: positive and negative values.
1346
1347    Parameters
1348    ----------
1349    vals: Union[List[str], Tuple[str]]
1350        A list of strings to parse.
1351
1352    negation_prefix: Optional[str], default None
1353        Include values that start with this string in the second list.
1354        If `None`, use the system default (`_`).
1355    """
1356    if negation_prefix is None:
1357        from meerschaum.config.static import STATIC_CONFIG
1358        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1359    _in_vals, _ex_vals = [], []
1360    for v in vals:
1361        if str(v).startswith(negation_prefix):
1362            _ex_vals.append(str(v)[len(negation_prefix):])
1363        else:
1364            _in_vals.append(v)
1365
1366    return _in_vals, _ex_vals

Separate the negated values from the positive ones. Return two lists: positive and negative values.

Parameters
  • vals (Union[List[str], Tuple[str]]): A list of strings to parse.
  • negation_prefix (Optional[str], default None): Include values that start with this string in the second list. If None, use the system default (_).
def get_in_ex_params( params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1369def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]:
1370    """
1371    Translate a params dictionary into lists of include- and exclude-values.
1372
1373    Parameters
1374    ----------
1375    params: Optional[Dict[str, Any]]
1376        A params query dictionary.
1377
1378    Returns
1379    -------
1380    A dictionary mapping keys to a tuple of lists for include and exclude values.
1381
1382    Examples
1383    --------
1384    >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
1385    {'a': (['b', 'c', 'e'], ['d', 'f'])}
1386    """
1387    if not params:
1388        return {}
1389    return {
1390        col: separate_negation_values(
1391            (
1392                val
1393                if isinstance(val, (list, tuple))
1394                else [val]
1395            )
1396        )
1397        for col, val in params.items()
1398    }

Translate a params dictionary into lists of include- and exclude-values.

Parameters
  • params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
  • A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
def flatten_list(list_: List[Any]) -> List[Any]:
1401def flatten_list(list_: List[Any]) -> List[Any]:
1402    """
1403    Recursively flatten a list.
1404    """
1405    for item in list_:
1406        if isinstance(item, list):
1407            yield from flatten_list(item)
1408        else:
1409            yield item

Recursively flatten a list.

def parametrized(dec):
1482def parametrized(dec):
1483    """
1484    A meta-decorator for allowing other decorator functions to have parameters.
1485
1486    https://stackoverflow.com/a/26151604/9699829
1487    """
1488    def layer(*args, **kwargs):
1489        def repl(f):
1490            return dec(f, *args, **kwargs)
1491        return repl
1492    return layer

A meta-decorator for allowing other decorator functions to have parameters.

https://stackoverflow.com/a/26151604/9699829

def safely_extract_tar(tarf: "'file'", output_dir: Union[str, pathlib.Path]) -> None:
1495def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None:
1496    """
1497    Safely extract a TAR file to a give directory.
1498    This defends against CVE-2007-4559.
1499
1500    Parameters
1501    ----------
1502    tarf: file
1503        The TAR file opened with `tarfile.open(path, 'r:gz')`.
1504
1505    output_dir: Union[str, pathlib.Path]
1506        The output directory.
1507    """
1508    import os
1509
1510    def is_within_directory(directory, target):
1511        abs_directory = os.path.abspath(directory)
1512        abs_target = os.path.abspath(target)
1513        prefix = os.path.commonprefix([abs_directory, abs_target])
1514        return prefix == abs_directory 
1515
1516    def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
1517        for member in tar.getmembers():
1518            member_path = os.path.join(path, member.name)
1519            if not is_within_directory(path, member_path):
1520                raise Exception("Attempted Path Traversal in Tar File")
1521
1522        tar.extractall(path=path, members=members, numeric_owner=numeric_owner)
1523
1524    return safe_extract(tarf, output_dir)

Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.

Parameters
  • tarf (file): The TAR file opened with tarfile.open(path, 'r:gz').
  • output_dir (Union[str, pathlib.Path]): The output directory.
def choose_subaction(*args, **kwargs) -> Any:
1530def choose_subaction(*args, **kwargs) -> Any:
1531    """
1532    Placeholder function to prevent breaking legacy behavior.
1533    See `meerschaum.actions.choose_subaction`.
1534    """
1535    from meerschaum.actions import choose_subaction as _choose_subactions
1536    return _choose_subactions(*args, **kwargs)

Placeholder function to prevent breaking legacy behavior. See choose_subaction.