meerschaum.utils.misc
Miscellaneous functions go here
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4""" 5Miscellaneous functions go here 6""" 7 8from __future__ import annotations 9 10import sys 11import functools 12from datetime import timedelta, datetime 13 14from meerschaum.utils.typing import ( 15 Union, 16 Any, 17 Callable, 18 Optional, 19 List, 20 Dict, 21 SuccessTuple, 22 Iterable, 23 PipesDict, 24 Tuple, 25 TYPE_CHECKING, 26) 27if TYPE_CHECKING: 28 import collections 29 30__pdoc__: Dict[str, bool] = { 31 'to_pandas_dtype': False, 32 'filter_unseen_df': False, 33 'add_missing_cols_to_df': False, 34 'parse_df_datetimes': False, 35 'df_from_literal': False, 36 'get_json_cols': False, 37 'get_unhashable_cols': False, 38 'enforce_dtypes': False, 39 'get_datetime_bound_from_df': False, 40 'df_is_chunk_generator': False, 41 'choices_docstring': False, 42 '_get_subaction_names': False, 43 'is_pipe_registered': False, 44 'replace_pipes_in_dict': False, 45 'round_time': False, 46} 47 48 49def add_method_to_class( 50 func: Callable[[Any], Any], 51 class_def: 'Class', 52 method_name: Optional[str] = None, 53 keep_self: Optional[bool] = None, 54) -> Callable[[Any], Any]: 55 """ 56 Add function `func` to class `class_def`. 57 58 Parameters 59 ---------- 60 func: Callable[[Any], Any] 61 Function to be added as a method of the class 62 63 class_def: Class 64 Class to be modified. 65 66 method_name: Optional[str], default None 67 New name of the method. None will use func.__name__ (default). 68 69 Returns 70 ------- 71 The modified function object. 72 73 """ 74 from functools import wraps 75 76 is_class = isinstance(class_def, type) 77 78 @wraps(func) 79 def wrapper(self, *args, **kw): 80 return func(*args, **kw) 81 82 if method_name is None: 83 method_name = func.__name__ 84 85 setattr(class_def, method_name, ( 86 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 87 ) 88 ) 89 90 return func 91 92 93def generate_password(length: int = 12) -> str: 94 """ 95 Generate a secure password of given length. 96 97 Parameters 98 ---------- 99 length: int, default 12 100 The length of the password. 101 102 Returns 103 ------- 104 A random password string. 105 """ 106 import secrets 107 import string 108 return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length))) 109 110 111def is_int(s: str) -> bool: 112 """ 113 Check if string is an int. 114 115 Parameters 116 ---------- 117 s: str 118 The string to be checked. 119 120 Returns 121 ------- 122 A bool indicating whether the string was able to be cast to an integer. 123 124 """ 125 try: 126 return float(s).is_integer() 127 except Exception: 128 return False 129 130 131def is_uuid(s: str) -> bool: 132 """ 133 Check if a string is a valid UUID. 134 135 Parameters 136 ---------- 137 s: str 138 The string to be checked. 139 140 Returns 141 ------- 142 A bool indicating whether the string is a valid UUID. 143 """ 144 import uuid 145 try: 146 uuid.UUID(str(s)) 147 return True 148 except Exception: 149 return False 150 151 152def string_to_dict(params_string: str) -> Dict[str, Any]: 153 """ 154 Parse a string into a dictionary. 155 156 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 157 158 Parameters 159 ---------- 160 params_string: str 161 The string to be parsed. 162 163 Returns 164 ------- 165 The parsed dictionary. 166 167 Examples 168 -------- 169 >>> string_to_dict("a:1,b:2") 170 {'a': 1, 'b': 2} 171 >>> string_to_dict('{"a": 1, "b": 2}') 172 {'a': 1, 'b': 2} 173 174 """ 175 if not params_string: 176 return {} 177 178 import json 179 180 ### Kind of a weird edge case. 181 ### In the generated compose file, there is some weird escaping happening, 182 ### so the string to be parsed starts and ends with a single quote. 183 if ( 184 isinstance(params_string, str) 185 and len(params_string) > 4 186 and params_string[1] == "{" 187 and params_string[-2] == "}" 188 ): 189 return json.loads(params_string[1:-1]) 190 191 if str(params_string).startswith('{'): 192 return json.loads(params_string) 193 194 import ast 195 params_dict = {} 196 197 items = [] 198 bracket_level = 0 199 brace_level = 0 200 current_item = '' 201 in_quotes = False 202 quote_char = '' 203 204 i = 0 205 while i < len(params_string): 206 char = params_string[i] 207 208 if in_quotes: 209 if char == quote_char and (i == 0 or params_string[i-1] != '\\'): 210 in_quotes = False 211 else: 212 if char in ('"', "'"): 213 in_quotes = True 214 quote_char = char 215 elif char == '[': 216 bracket_level += 1 217 elif char == ']': 218 bracket_level -= 1 219 elif char == '{': 220 brace_level += 1 221 elif char == '}': 222 brace_level -= 1 223 elif char == ',' and bracket_level == 0 and brace_level == 0: 224 items.append(current_item) 225 current_item = '' 226 i += 1 227 continue 228 229 current_item += char 230 i += 1 231 232 if current_item: 233 items.append(current_item) 234 235 for param in items: 236 param = param.strip() 237 if not param: 238 continue 239 240 _keys = param.split(":", maxsplit=1) 241 if len(_keys) != 2: 242 continue 243 244 keys = _keys[:-1] 245 try: 246 val = ast.literal_eval(_keys[-1]) 247 except Exception: 248 val = str(_keys[-1]) 249 250 c = params_dict 251 for _k in keys[:-1]: 252 try: 253 k = ast.literal_eval(_k) 254 except Exception: 255 k = str(_k) 256 if k not in c: 257 c[k] = {} 258 c = c[k] 259 260 c[keys[-1]] = val 261 262 return params_dict 263 264 265def to_simple_dict(doc: Dict[str, Any]) -> str: 266 """ 267 Serialize a document dictionary in simple-dict format. 268 """ 269 import json 270 import ast 271 from meerschaum.utils.dtypes import json_serialize_value 272 273 def serialize_value(value): 274 if isinstance(value, str): 275 try: 276 evaluated = ast.literal_eval(value) 277 if not isinstance(evaluated, str): 278 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 279 return value 280 except (ValueError, SyntaxError, TypeError, MemoryError): 281 return value 282 283 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 284 285 return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items()) 286 287 288def parse_config_substitution( 289 value: str, 290 leading_key: str = 'MRSM', 291 begin_key: str = '{', 292 end_key: str = '}', 293 delimeter: str = ':', 294) -> List[Any]: 295 """ 296 Parse Meerschaum substitution syntax 297 E.g. MRSM{value1:value2} => ['value1', 'value2'] 298 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 299 """ 300 if not value.beginswith(leading_key): 301 return value 302 303 return leading_key[len(leading_key):][len():-1].split(delimeter) 304 305 306def edit_file( 307 path: Union['pathlib.Path', str], 308 default_editor: str = 'pyvim', 309 debug: bool = False 310) -> bool: 311 """ 312 Open a file for editing. 313 314 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 315 316 Parameters 317 ---------- 318 path: Union[pathlib.Path, str] 319 The path to the file to be edited. 320 321 default_editor: str, default 'pyvim' 322 If `$EDITOR` is not set, use this instead. 323 If `pyvim` is not installed, it will install it from PyPI. 324 325 debug: bool, default False 326 Verbosity toggle. 327 328 Returns 329 ------- 330 A bool indicating the file was successfully edited. 331 """ 332 import os 333 from subprocess import call 334 from meerschaum.utils.debug import dprint 335 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 336 try: 337 EDITOR = os.environ.get('EDITOR', default_editor) 338 if debug: 339 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 340 rc = call([EDITOR, path]) 341 except Exception as e: ### can't open with default editors 342 if debug: 343 dprint(str(e)) 344 dprint('Failed to open file with system editor. Falling back to pyvim...') 345 pyvim = attempt_import('pyvim', lazy=False) 346 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 347 return rc == 0 348 349 350def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 351 """ 352 Determine the columns and lines in the terminal. 353 If they cannot be determined, return the default values (100 columns and 120 lines). 354 355 Parameters 356 ---------- 357 default_cols: int, default 100 358 If the columns cannot be determined, return this value. 359 360 default_lines: int, default 120 361 If the lines cannot be determined, return this value. 362 363 Returns 364 ------- 365 A tuple if integers for the columns and lines. 366 """ 367 import os 368 try: 369 size = os.get_terminal_size() 370 _cols, _lines = size.columns, size.lines 371 except Exception: 372 _cols, _lines = ( 373 int(os.environ.get('COLUMNS', str(default_cols))), 374 int(os.environ.get('LINES', str(default_lines))), 375 ) 376 return _cols, _lines 377 378 379def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 380 """ 381 Iterate over a list in chunks. 382 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 383 384 Parameters 385 ---------- 386 iterable: Iterable[Any] 387 The iterable to iterate over in chunks. 388 389 chunksize: int 390 The size of chunks to iterate with. 391 392 fillvalue: Optional[Any], default None 393 If the chunks do not evenly divide into the iterable, pad the end with this value. 394 395 Returns 396 ------- 397 A generator of tuples of size `chunksize`. 398 399 """ 400 from itertools import zip_longest 401 args = [iter(iterable)] * chunksize 402 return zip_longest(*args, fillvalue=fillvalue) 403 404def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 405 """ 406 Sort a dictionary's values and return a new dictionary. 407 408 Parameters 409 ---------- 410 d: Dict[Any, Any] 411 The dictionary to be sorted. 412 413 Returns 414 ------- 415 A sorted dictionary. 416 417 Examples 418 -------- 419 >>> sorted_dict({'b': 1, 'a': 2}) 420 {'b': 1, 'a': 2} 421 >>> sorted_dict({'b': 2, 'a': 1}) 422 {'a': 1, 'b': 2} 423 424 """ 425 try: 426 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 427 except Exception: 428 return d 429 430def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 431 """ 432 Convert the standard pipes dictionary into a list. 433 434 Parameters 435 ---------- 436 pipes_dict: PipesDict 437 The pipes dictionary to be flattened. 438 439 Returns 440 ------- 441 A list of `Pipe` objects. 442 443 """ 444 pipes_list = [] 445 for ck in pipes_dict.values(): 446 for mk in ck.values(): 447 pipes_list += list(mk.values()) 448 return pipes_list 449 450 451def timed_input( 452 seconds: int = 10, 453 timeout_message: str = "", 454 prompt: str = "", 455 icon: bool = False, 456 **kw 457) -> Union[str, None]: 458 """ 459 Accept user input only for a brief period of time. 460 461 Parameters 462 ---------- 463 seconds: int, default 10 464 The number of seconds to wait. 465 466 timeout_message: str, default '' 467 The message to print after the window has elapsed. 468 469 prompt: str, default '' 470 The prompt to print during the window. 471 472 icon: bool, default False 473 If `True`, print the configured input icon. 474 475 476 Returns 477 ------- 478 The input string entered by the user. 479 480 """ 481 import signal, time 482 483 class TimeoutExpired(Exception): 484 """Raise this exception when the timeout is reached.""" 485 486 def alarm_handler(signum, frame): 487 raise TimeoutExpired 488 489 # set signal handler 490 signal.signal(signal.SIGALRM, alarm_handler) 491 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 492 493 try: 494 return input(prompt) 495 except TimeoutExpired: 496 return None 497 except (EOFError, RuntimeError): 498 try: 499 print(prompt) 500 time.sleep(seconds) 501 except TimeoutExpired: 502 return None 503 finally: 504 signal.alarm(0) # cancel alarm 505 506 507def enforce_gevent_monkey_patch(): 508 """ 509 Check if gevent monkey patching is enabled, and if not, then apply patching. 510 """ 511 from meerschaum.utils.packages import attempt_import 512 import socket 513 gevent, gevent_socket, gevent_monkey = attempt_import( 514 'gevent', 'gevent.socket', 'gevent.monkey' 515 ) 516 if not socket.socket is gevent_socket.socket: 517 gevent_monkey.patch_all() 518 519def is_valid_email(email: str) -> Union['re.Match', None]: 520 """ 521 Check whether a string is a valid email. 522 523 Parameters 524 ---------- 525 email: str 526 The string to be examined. 527 528 Returns 529 ------- 530 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 531 532 Examples 533 -------- 534 >>> is_valid_email('foo') 535 >>> is_valid_email('foo@foo.com') 536 <re.Match object; span=(0, 11), match='foo@foo.com'> 537 538 """ 539 import re 540 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 541 return re.search(regex, email) 542 543 544def string_width(string: str, widest: bool = True) -> int: 545 """ 546 Calculate the width of a string, either by its widest or last line. 547 548 Parameters 549 ---------- 550 string: str: 551 The string to be examined. 552 553 widest: bool, default True 554 No longer used because `widest` is always assumed to be true. 555 556 Returns 557 ------- 558 An integer for the text's visual width. 559 560 Examples 561 -------- 562 >>> string_width('a') 563 1 564 >>> string_width('a\\nbc\\nd') 565 2 566 567 """ 568 def _widest(): 569 words = string.split('\n') 570 max_length = 0 571 for w in words: 572 length = len(w) 573 if length > max_length: 574 max_length = length 575 return max_length 576 577 return _widest() 578 579def _pyinstaller_traverse_dir( 580 directory: str, 581 ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), 582 include_dotfiles: bool = False 583) -> list: 584 """ 585 Recursively traverse a directory and return a list of its contents. 586 """ 587 import os, pathlib 588 paths = [] 589 _directory = pathlib.Path(directory) 590 591 def _found_pattern(name: str): 592 for pattern in ignore_patterns: 593 if pattern.replace('/', os.path.sep) in str(name): 594 return True 595 return False 596 597 for root, dirs, files in os.walk(_directory): 598 _root = str(root)[len(str(_directory.parent)):] 599 if _root.startswith(os.path.sep): 600 _root = _root[len(os.path.sep):] 601 if _root.startswith('.') and not include_dotfiles: 602 continue 603 ### ignore certain patterns 604 if _found_pattern(_root): 605 continue 606 607 for filename in files: 608 if filename.startswith('.') and not include_dotfiles: 609 continue 610 path = os.path.join(root, filename) 611 if _found_pattern(path): 612 continue 613 614 _path = str(path)[len(str(_directory.parent)):] 615 if _path.startswith(os.path.sep): 616 _path = _path[len(os.path.sep):] 617 _path = os.path.sep.join(_path.split(os.path.sep)[:-1]) 618 619 paths.append((path, _path)) 620 return paths 621 622 623def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any: 624 """ 625 Get a value from a dictionary with a tuple of keys. 626 627 Parameters 628 ---------- 629 d: Dict[Any, Any] 630 The dictionary to search. 631 632 path: Tuple[Any, ...] 633 The path of keys to traverse. 634 635 Returns 636 ------- 637 The value from the end of the path. 638 """ 639 return functools.reduce(lambda di, key: di[key], path, d) 640 641 642def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None: 643 """ 644 Set a value in a dictionary with a tuple of keys. 645 646 Parameters 647 ---------- 648 d: Dict[Any, Any] 649 The dictionary to search. 650 651 path: Tuple[Any, ...] 652 The path of keys to traverse. 653 654 val: Any 655 The value to set at the end of the path. 656 """ 657 get_val_from_dict_path(d, path[:-1])[path[-1]] = val 658 659 660def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 661 """ 662 Recursively replace passwords in a dictionary. 663 664 Parameters 665 ---------- 666 d: Dict[str, Any] 667 The dictionary to search through. 668 669 replace_with: str, default '*' 670 The string to replace each character of the password with. 671 672 Returns 673 ------- 674 Another dictionary where values to the keys `'password'` 675 are replaced with `replace_with` (`'*'`). 676 677 Examples 678 -------- 679 >>> replace_password({'a': 1}) 680 {'a': 1} 681 >>> replace_password({'password': '123'}) 682 {'password': '***'} 683 >>> replace_password({'nested': {'password': '123'}}) 684 {'nested': {'password': '***'}} 685 >>> replace_password({'password': '123'}, replace_with='!') 686 {'password': '!!!'} 687 688 """ 689 import copy 690 _d = copy.deepcopy(d) 691 for k, v in d.items(): 692 if isinstance(v, dict): 693 _d[k] = replace_password(v) 694 elif 'password' in str(k).lower(): 695 _d[k] = ''.join([replace_with for char in str(v)]) 696 elif str(k).lower() == 'uri': 697 from meerschaum.connectors.sql import SQLConnector 698 try: 699 uri_params = SQLConnector.parse_uri(v) 700 except Exception: 701 uri_params = None 702 if not uri_params: 703 continue 704 if not 'username' in uri_params or not 'password' in uri_params: 705 continue 706 _d[k] = v.replace( 707 uri_params['username'] + ':' + uri_params['password'], 708 uri_params['username'] + ':' + ''.join( 709 [replace_with for char in str(uri_params['password'])] 710 ) 711 ) 712 return _d 713 714 715def filter_arguments( 716 func: Callable[[Any], Any], 717 *args: Any, 718 **kwargs: Any 719) -> Tuple[Tuple[Any], Dict[str, Any]]: 720 """ 721 Filter out unsupported positional and keyword arguments. 722 723 Parameters 724 ---------- 725 func: Callable[[Any], Any] 726 The function to inspect. 727 728 *args: Any 729 Positional arguments to filter and pass to `func`. 730 731 **kwargs 732 Keyword arguments to filter and pass to `func`. 733 734 Returns 735 ------- 736 The `args` and `kwargs` accepted by `func`. 737 """ 738 args = filter_positionals(func, *args) 739 kwargs = filter_keywords(func, **kwargs) 740 return args, kwargs 741 742 743def filter_keywords( 744 func: Callable[[Any], Any], 745 **kw: Any 746) -> Dict[str, Any]: 747 """ 748 Filter out unsupported keyword arguments. 749 750 Parameters 751 ---------- 752 func: Callable[[Any], Any] 753 The function to inspect. 754 755 **kw: Any 756 The arguments to be filtered and passed into `func`. 757 758 Returns 759 ------- 760 A dictionary of keyword arguments accepted by `func`. 761 762 Examples 763 -------- 764 ```python 765 >>> def foo(a=1, b=2): 766 ... return a * b 767 >>> filter_keywords(foo, a=2, b=4, c=6) 768 {'a': 2, 'b': 4} 769 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 770 8 771 ``` 772 773 """ 774 import inspect 775 func_params = inspect.signature(func).parameters 776 ### If the function has a **kw method, skip filtering. 777 for param, _type in func_params.items(): 778 if '**' in str(_type): 779 return kw 780 return {k: v for k, v in kw.items() if k in func_params} 781 782 783def filter_positionals( 784 func: Callable[[Any], Any], 785 *args: Any 786) -> Tuple[Any]: 787 """ 788 Filter out unsupported positional arguments. 789 790 Parameters 791 ---------- 792 func: Callable[[Any], Any] 793 The function to inspect. 794 795 *args: Any 796 The arguments to be filtered and passed into `func`. 797 NOTE: If the function signature expects more arguments than provided, 798 the missing slots will be filled with `None`. 799 800 Returns 801 ------- 802 A tuple of positional arguments accepted by `func`. 803 804 Examples 805 -------- 806 ```python 807 >>> def foo(a, b): 808 ... return a * b 809 >>> filter_positionals(foo, 2, 4, 6) 810 (2, 4) 811 >>> foo(*filter_positionals(foo, 2, 4, 6)) 812 8 813 ``` 814 815 """ 816 import inspect 817 from meerschaum.utils.warnings import warn 818 func_params = inspect.signature(func).parameters 819 acceptable_args: List[Any] = [] 820 821 def _warn_invalids(_num_invalid): 822 if _num_invalid > 0: 823 warn( 824 "Too few arguments were provided. " 825 + f"{_num_invalid} argument" 826 + ('s have ' if _num_invalid != 1 else " has ") 827 + " been filled with `None`.", 828 ) 829 830 num_invalid: int = 0 831 for i, (param, val) in enumerate(func_params.items()): 832 if '=' in str(val) or '*' in str(val): 833 _warn_invalids(num_invalid) 834 return tuple(acceptable_args) 835 836 try: 837 acceptable_args.append(args[i]) 838 except IndexError: 839 acceptable_args.append(None) 840 num_invalid += 1 841 842 _warn_invalids(num_invalid) 843 return tuple(acceptable_args) 844 845 846def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 847 """ 848 Convert an ordered dict to a dict. 849 Does not mutate the original OrderedDict. 850 """ 851 from collections import OrderedDict 852 _d = dict(od) 853 for k, v in od.items(): 854 if isinstance(v, OrderedDict) or ( 855 issubclass(type(v), OrderedDict) 856 ): 857 _d[k] = dict_from_od(v) 858 return _d 859 860 861def remove_ansi(s: str) -> str: 862 """ 863 Remove ANSI escape characters from a string. 864 865 Parameters 866 ---------- 867 s: str: 868 The string to be cleaned. 869 870 Returns 871 ------- 872 A string with the ANSI characters removed. 873 874 Examples 875 -------- 876 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 877 'Hello, World!' 878 879 """ 880 import re 881 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s) 882 883 884def get_connector_labels( 885 *types: str, 886 search_term: str = '', 887 ignore_exact_match = True, 888 _additional_options: Optional[List[str]] = None, 889) -> List[str]: 890 """ 891 Read connector labels from the configuration dictionary. 892 893 Parameters 894 ---------- 895 *types: str 896 The connector types. 897 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 898 899 search_term: str, default '' 900 A filter on the connectors' labels. 901 902 ignore_exact_match: bool, default True 903 If `True`, skip a connector if the search_term is an exact match. 904 905 Returns 906 ------- 907 A list of the keys of defined connectors. 908 909 """ 910 from meerschaum.config import get_config 911 connectors = get_config('meerschaum', 'connectors') 912 913 _types = list(types) 914 if len(_types) == 0: 915 _types = list(connectors.keys()) + ['plugin'] 916 917 conns = [] 918 for t in _types: 919 if t == 'plugin': 920 from meerschaum.plugins import get_data_plugins 921 conns += [ 922 f'{t}:' + plugin.module.__name__.split('.')[-1] 923 for plugin in get_data_plugins() 924 ] 925 continue 926 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 927 928 if _additional_options: 929 conns += _additional_options 930 931 possibilities = [ 932 c 933 for c in conns 934 if c.startswith(search_term) 935 and c != ( 936 search_term if ignore_exact_match else '' 937 ) 938 ] 939 return sorted(possibilities) 940 941 942def wget( 943 url: str, 944 dest: Optional[Union[str, 'pathlib.Path']] = None, 945 headers: Optional[Dict[str, Any]] = None, 946 color: bool = True, 947 debug: bool = False, 948 **kw: Any 949) -> 'pathlib.Path': 950 """ 951 Mimic `wget` with `requests`. 952 953 Parameters 954 ---------- 955 url: str 956 The URL to the resource to be downloaded. 957 958 dest: Optional[Union[str, pathlib.Path]], default None 959 The destination path of the downloaded file. 960 If `None`, save to the current directory. 961 962 color: bool, default True 963 If `debug` is `True`, print color output. 964 965 debug: bool, default False 966 Verbosity toggle. 967 968 Returns 969 ------- 970 The path to the downloaded file. 971 972 """ 973 from meerschaum.utils.warnings import warn, error 974 from meerschaum.utils.debug import dprint 975 import os, pathlib, re, urllib.request 976 if headers is None: 977 headers = {} 978 request = urllib.request.Request(url, headers=headers) 979 if not color: 980 dprint = print 981 if debug: 982 dprint(f"Downloading from '{url}'...") 983 try: 984 response = urllib.request.urlopen(request) 985 except Exception as e: 986 import ssl 987 ssl._create_default_https_context = ssl._create_unverified_context 988 try: 989 response = urllib.request.urlopen(request) 990 except Exception as _e: 991 print(_e) 992 response = None 993 if response is None or response.code != 200: 994 error_msg = f"Failed to download from '{url}'." 995 if color: 996 error(error_msg) 997 else: 998 print(error_msg) 999 import sys 1000 sys.exit(1) 1001 1002 d = response.headers.get('content-disposition', None) 1003 fname = ( 1004 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1005 else url.split('/')[-1] 1006 ) 1007 1008 if dest is None: 1009 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1010 elif isinstance(dest, str): 1011 dest = pathlib.Path(dest) 1012 1013 with open(dest, 'wb') as f: 1014 f.write(response.fp.read()) 1015 1016 if debug: 1017 dprint(f"Downloaded file '{dest}'.") 1018 1019 return dest 1020 1021 1022def async_wrap(func): 1023 """ 1024 Run a synchronous function as async. 1025 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1026 """ 1027 import asyncio 1028 from functools import wraps, partial 1029 1030 @wraps(func) 1031 async def run(*args, loop=None, executor=None, **kwargs): 1032 if loop is None: 1033 loop = asyncio.get_event_loop() 1034 pfunc = partial(func, *args, **kwargs) 1035 return await loop.run_in_executor(executor, pfunc) 1036 return run 1037 1038 1039def debug_trace(browser: bool = True): 1040 """ 1041 Open a web-based debugger to trace the execution of the program. 1042 1043 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1044 """ 1045 from meerschaum.utils.debug import trace 1046 trace(browser=browser) 1047 1048 1049def items_str( 1050 items: List[Any], 1051 quotes: bool = True, 1052 quote_str: str = "'", 1053 commas: bool = True, 1054 comma_str: str = ',', 1055 and_: bool = True, 1056 and_str: str = 'and', 1057 oxford_comma: bool = True, 1058 spaces: bool = True, 1059 space_str = ' ', 1060) -> str: 1061 """ 1062 Return a formatted string if list items separated by commas. 1063 1064 Parameters 1065 ---------- 1066 items: [List[Any]] 1067 The items to be printed as an English list. 1068 1069 quotes: bool, default True 1070 If `True`, wrap items in quotes. 1071 1072 quote_str: str, default "'" 1073 If `quotes` is `True`, prepend and append each item with this string. 1074 1075 and_: bool, default True 1076 If `True`, include the word 'and' before the final item in the list. 1077 1078 and_str: str, default 'and' 1079 If `and_` is True, insert this string where 'and' normally would in and English list. 1080 1081 oxford_comma: bool, default True 1082 If `True`, include the Oxford Comma (comma before the final 'and'). 1083 Only applies when `and_` is `True`. 1084 1085 spaces: bool, default True 1086 If `True`, separate items with `space_str` 1087 1088 space_str: str, default ' ' 1089 If `spaces` is `True`, separate items with this string. 1090 1091 Returns 1092 ------- 1093 A string of the items as an English list. 1094 1095 Examples 1096 -------- 1097 >>> items_str([1,2,3]) 1098 "'1', '2', and '3'" 1099 >>> items_str([1,2,3], quotes=False) 1100 '1, 2, and 3' 1101 >>> items_str([1,2,3], and_=False) 1102 "'1', '2', '3'" 1103 >>> items_str([1,2,3], spaces=False, and_=False) 1104 "'1','2','3'" 1105 >>> items_str([1,2,3], oxford_comma=False) 1106 "'1', '2' and '3'" 1107 >>> items_str([1,2,3], quote_str=":") 1108 ':1:, :2:, and :3:' 1109 >>> items_str([1,2,3], and_str="or") 1110 "'1', '2', or '3'" 1111 >>> items_str([1,2,3], space_str="_") 1112 "'1',_'2',_and_'3'" 1113 1114 """ 1115 if not items: 1116 return '' 1117 1118 q = quote_str if quotes else '' 1119 s = space_str if spaces else '' 1120 a = and_str if and_ else '' 1121 c = comma_str if commas else '' 1122 1123 if len(items) == 1: 1124 return q + str(list(items)[0]) + q 1125 1126 if len(items) == 2: 1127 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1128 1129 sep = q + c + s + q 1130 output = q + sep.join(str(i) for i in items[:-1]) + q 1131 if oxford_comma: 1132 output += c 1133 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1134 return output 1135 1136 1137def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str: 1138 """ 1139 Return a human-readable string for a `timedelta` (or `int` minutes). 1140 1141 Parameters 1142 ---------- 1143 delta: Union[timedelta, int] 1144 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1145 1146 round_unit: bool, default False 1147 If `True`, round the output to a single unit. 1148 1149 Returns 1150 ------- 1151 A formatted string, fit for human eyes. 1152 """ 1153 from meerschaum.utils.packages import attempt_import 1154 if is_int(str(delta)) and not round_unit: 1155 return str(delta) 1156 1157 humanfriendly = attempt_import('humanfriendly', lazy=False) 1158 delta_seconds = ( 1159 delta.total_seconds() 1160 if hasattr(delta, 'total_seconds') 1161 else (delta * 60) 1162 ) 1163 1164 is_negative = delta_seconds < 0 1165 delta_seconds = abs(delta_seconds) 1166 replace_units = {} 1167 1168 if round_unit: 1169 if delta_seconds < 1: 1170 delta_seconds = round(delta_seconds, 2) 1171 elif delta_seconds < 60: 1172 delta_seconds = int(delta_seconds) 1173 elif delta_seconds < 3600: 1174 delta_seconds = int(delta_seconds / 60) * 60 1175 elif delta_seconds < 86400: 1176 delta_seconds = int(delta_seconds / 3600) * 3600 1177 elif delta_seconds < (86400 * 7): 1178 delta_seconds = int(delta_seconds / 86400) * 86400 1179 elif delta_seconds < (86400 * 7 * 4): 1180 delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7) 1181 elif delta_seconds < (86400 * 7 * 4 * 13): 1182 delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7) 1183 replace_units['weeks'] = 'months' 1184 else: 1185 delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364) 1186 1187 delta_str = humanfriendly.format_timespan(delta_seconds) 1188 if ',' in delta_str and round_unit: 1189 delta_str = delta_str.split(',')[0] 1190 elif ' and ' in delta_str and round_unit: 1191 delta_str = delta_str.split(' and ')[0] 1192 1193 for parsed_unit, replacement_unit in replace_units.items(): 1194 delta_str = delta_str.replace(parsed_unit, replacement_unit) 1195 1196 return delta_str + (' ago' if is_negative else '') 1197 1198 1199def is_docker_available() -> bool: 1200 """Check if we can connect to the Docker engine.""" 1201 import subprocess 1202 try: 1203 has_docker = subprocess.call( 1204 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1205 ) == 0 1206 except Exception: 1207 has_docker = False 1208 return has_docker 1209 1210 1211def is_android() -> bool: 1212 """Return `True` if the current platform is Android.""" 1213 import sys 1214 return hasattr(sys, 'getandroidapilevel') 1215 1216 1217def is_bcp_available() -> bool: 1218 """Check if the MSSQL `bcp` utility is installed.""" 1219 import subprocess 1220 1221 try: 1222 has_bcp = subprocess.call( 1223 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1224 ) == 0 1225 except Exception: 1226 has_bcp = False 1227 return has_bcp 1228 1229 1230def is_systemd_available() -> bool: 1231 """Check if running on systemd.""" 1232 import subprocess 1233 try: 1234 has_systemctl = subprocess.call( 1235 ['systemctl', 'whoami'], 1236 stdout=subprocess.DEVNULL, 1237 stderr=subprocess.STDOUT, 1238 ) == 0 1239 except FileNotFoundError: 1240 has_systemctl = False 1241 except Exception: 1242 import traceback 1243 traceback.print_exc() 1244 has_systemctl = False 1245 return has_systemctl 1246 1247 1248def is_tmux_available() -> bool: 1249 """ 1250 Check if `tmux` is installed. 1251 """ 1252 import subprocess 1253 try: 1254 has_tmux = subprocess.call( 1255 ['tmux', '-V'], 1256 stdout=subprocess.DEVNULL, 1257 stderr=subprocess.STDOUT 1258 ) == 0 1259 except FileNotFoundError: 1260 has_tmux = False 1261 except Exception: 1262 has_tmux = False 1263 return has_tmux 1264 1265def get_last_n_lines(file_name: str, N: int): 1266 """ 1267 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1268 """ 1269 import os 1270 # Create an empty list to keep the track of last N lines 1271 list_of_lines = [] 1272 # Open file for reading in binary mode 1273 with open(file_name, 'rb') as read_obj: 1274 # Move the cursor to the end of the file 1275 read_obj.seek(0, os.SEEK_END) 1276 # Create a buffer to keep the last read line 1277 buffer = bytearray() 1278 # Get the current position of pointer i.e eof 1279 pointer_location = read_obj.tell() 1280 # Loop till pointer reaches the top of the file 1281 while pointer_location >= 0: 1282 # Move the file pointer to the location pointed by pointer_location 1283 read_obj.seek(pointer_location) 1284 # Shift pointer location by -1 1285 pointer_location = pointer_location -1 1286 # read that byte / character 1287 new_byte = read_obj.read(1) 1288 # If the read byte is new line character then it means one line is read 1289 if new_byte == b'\n': 1290 # Save the line in list of lines 1291 list_of_lines.append(buffer.decode()[::-1]) 1292 # If the size of list reaches N, then return the reversed list 1293 if len(list_of_lines) == N: 1294 return list(reversed(list_of_lines)) 1295 # Reinitialize the byte array to save next line 1296 buffer = bytearray() 1297 else: 1298 # If last read character is not eol then add it in buffer 1299 buffer.extend(new_byte) 1300 # As file is read completely, if there is still data in buffer, then its first line. 1301 if len(buffer) > 0: 1302 list_of_lines.append(buffer.decode()[::-1]) 1303 # return the reversed list 1304 return list(reversed(list_of_lines)) 1305 1306 1307def tail(f, n, offset=None): 1308 """ 1309 https://stackoverflow.com/a/692616/9699829 1310 1311 Reads n lines from f with an offset of offset lines. The return 1312 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1313 an indicator that is `True` if there are more lines in the file. 1314 """ 1315 avg_line_length = 74 1316 to_read = n + (offset or 0) 1317 1318 while True: 1319 try: 1320 f.seek(-(avg_line_length * to_read), 2) 1321 except IOError: 1322 # woops. apparently file is smaller than what we want 1323 # to step back, go to the beginning instead 1324 f.seek(0) 1325 pos = f.tell() 1326 lines = f.read().splitlines() 1327 if len(lines) >= to_read or pos == 0: 1328 return lines[-to_read:offset and -offset or None], \ 1329 len(lines) > to_read or pos > 0 1330 avg_line_length *= 1.3 1331 1332 1333def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1334 """ 1335 Remove characters from each section of a string until the length is within the limit. 1336 1337 Parameters 1338 ---------- 1339 item: str 1340 The item name to be truncated. 1341 1342 delimeter: str, default '_' 1343 Split `item` by this string into several sections. 1344 1345 max_len: int, default 128 1346 The max acceptable length of the truncated version of `item`. 1347 1348 Returns 1349 ------- 1350 The truncated string. 1351 1352 Examples 1353 -------- 1354 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1355 'ab_de_gh' 1356 1357 """ 1358 if len(item) < max_len: 1359 return item 1360 1361 def _shorten(s: str) -> str: 1362 return s[:-1] if len(s) > 1 else s 1363 1364 sections = list(enumerate(item.split('_'))) 1365 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1366 available_chars = max_len - len(sections) 1367 1368 _sections = [(i, s) for i, s in sorted_sections] 1369 _sections_len = sum([len(s) for i, s in _sections]) 1370 _old_sections_len = _sections_len 1371 while _sections_len > available_chars: 1372 _sections = [(i, _shorten(s)) for i, s in _sections] 1373 _old_sections_len = _sections_len 1374 _sections_len = sum([len(s) for i, s in _sections]) 1375 if _old_sections_len == _sections_len: 1376 raise Exception(f"String could not be truncated: '{item}'") 1377 1378 new_sections = sorted(_sections, key=lambda x: x[0]) 1379 return delimeter.join([s for i, s in new_sections]) 1380 1381 1382def truncate_text_for_display( 1383 text: str, 1384 max_length: int = 50, 1385 suffix: str = '…', 1386) -> str: 1387 """ 1388 Truncate a potentially long string for display purposes. 1389 1390 Parameters 1391 ---------- 1392 text: str 1393 The string to be truncated. 1394 1395 max_length: int, default 60 1396 The maximum length of `text` before truncation. 1397 1398 suffix: str, default '…' 1399 The string to append to the length of `text` to indicate truncation. 1400 1401 Returns 1402 ------- 1403 A string of length `max_length` or less. 1404 """ 1405 text_length = len(text) 1406 if text_length <= max_length: 1407 return text 1408 1409 suffix_length = len(suffix) 1410 1411 truncated_text = text[:max_length - suffix_length] 1412 return truncated_text + suffix 1413 1414 1415def separate_negation_values( 1416 vals: Union[List[str], Tuple[str]], 1417 negation_prefix: Optional[str] = None, 1418) -> Tuple[List[str], List[str]]: 1419 """ 1420 Separate the negated values from the positive ones. 1421 Return two lists: positive and negative values. 1422 1423 Parameters 1424 ---------- 1425 vals: Union[List[str], Tuple[str]] 1426 A list of strings to parse. 1427 1428 negation_prefix: Optional[str], default None 1429 Include values that start with this string in the second list. 1430 If `None`, use the system default (`_`). 1431 """ 1432 if negation_prefix is None: 1433 from meerschaum._internal.static import STATIC_CONFIG 1434 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1435 _in_vals, _ex_vals = [], [] 1436 for v in vals: 1437 if str(v).startswith(negation_prefix): 1438 _ex_vals.append(str(v)[len(negation_prefix):]) 1439 else: 1440 _in_vals.append(v) 1441 1442 return _in_vals, _ex_vals 1443 1444 1445def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1446 """ 1447 Translate a params dictionary into lists of include- and exclude-values. 1448 1449 Parameters 1450 ---------- 1451 params: Optional[Dict[str, Any]] 1452 A params query dictionary. 1453 1454 Returns 1455 ------- 1456 A dictionary mapping keys to a tuple of lists for include and exclude values. 1457 1458 Examples 1459 -------- 1460 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1461 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1462 """ 1463 if not params: 1464 return {} 1465 return { 1466 col: separate_negation_values( 1467 ( 1468 val 1469 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype') 1470 else [val] 1471 ) 1472 ) 1473 for col, val in params.items() 1474 } 1475 1476 1477def flatten_list(list_: List[Any]) -> List[Any]: 1478 """ 1479 Recursively flatten a list. 1480 """ 1481 for item in list_: 1482 if isinstance(item, list): 1483 yield from flatten_list(item) 1484 else: 1485 yield item 1486 1487 1488def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1489 """ 1490 Parse a string containing the text to be passed into a function 1491 and return a tuple of args, kwargs. 1492 1493 Parameters 1494 ---------- 1495 args_str: str 1496 The contents of the function parameter (as a string). 1497 1498 Returns 1499 ------- 1500 A tuple of args (tuple) and kwargs (dict[str, Any]). 1501 1502 Examples 1503 -------- 1504 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1505 (123, 456), {'foo': 789, 'bar': 'baz'} 1506 """ 1507 import ast 1508 args = [] 1509 kwargs = {} 1510 1511 for part in args_str.split(','): 1512 if '=' in part: 1513 key, val = part.split('=', 1) 1514 kwargs[key.strip()] = ast.literal_eval(val) 1515 else: 1516 args.append(ast.literal_eval(part.strip())) 1517 1518 return tuple(args), kwargs 1519 1520 1521def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1522 """ 1523 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1524 1525 Parameters 1526 ---------- 1527 src_path: pathlib.Path 1528 The source path. 1529 1530 dest_path: pathlib.Path 1531 The destination path. 1532 1533 Returns 1534 ------- 1535 A SuccessTuple indicating success. 1536 """ 1537 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1538 return True, "Symlink already exists." 1539 try: 1540 dest_path.symlink_to(src_path) 1541 success = True 1542 except Exception as e: 1543 success = False 1544 msg = str(e) 1545 if success: 1546 return success, "Success" 1547 1548 ### Failed to create a symlink. 1549 ### If we're not on Windows, return an error. 1550 import platform 1551 if platform.system() != 'Windows': 1552 return success, msg 1553 1554 try: 1555 import _winapi 1556 except ImportError: 1557 return False, "Unable to import _winapi." 1558 1559 if src_path.is_dir(): 1560 try: 1561 _winapi.CreateJunction(str(src_path), str(dest_path)) 1562 except Exception as e: 1563 return False, str(e) 1564 return True, "Success" 1565 1566 ### Last resort: copy the file on Windows. 1567 import shutil 1568 try: 1569 shutil.copy(src_path, dest_path) 1570 except Exception as e: 1571 return False, str(e) 1572 1573 return True, "Success" 1574 1575 1576def is_symlink(path: pathlib.Path) -> bool: 1577 """ 1578 Wrap `path.is_symlink()` but add support for Windows junctions. 1579 """ 1580 if path.is_symlink(): 1581 return True 1582 import platform 1583 import os 1584 if platform.system() != 'Windows': 1585 return False 1586 try: 1587 return bool(os.readlink(path)) 1588 except OSError: 1589 return False 1590 1591 1592def parametrized(dec): 1593 """ 1594 A meta-decorator for allowing other decorator functions to have parameters. 1595 1596 https://stackoverflow.com/a/26151604/9699829 1597 """ 1598 def layer(*args, **kwargs): 1599 def repl(f): 1600 return dec(f, *args, **kwargs) 1601 return repl 1602 return layer 1603 1604 1605def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1606 """ 1607 Safely extract a TAR file to a give directory. 1608 This defends against CVE-2007-4559. 1609 1610 Parameters 1611 ---------- 1612 tarf: file 1613 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1614 1615 output_dir: Union[str, pathlib.Path] 1616 The output directory. 1617 """ 1618 import os 1619 1620 def is_within_directory(directory, target): 1621 abs_directory = os.path.abspath(directory) 1622 abs_target = os.path.abspath(target) 1623 prefix = os.path.commonprefix([abs_directory, abs_target]) 1624 return prefix == abs_directory 1625 1626 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1627 for member in tar.getmembers(): 1628 member_path = os.path.join(path, member.name) 1629 if not is_within_directory(path, member_path): 1630 raise Exception("Attempted Path Traversal in Tar File") 1631 1632 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1633 1634 return safe_extract(tarf, output_dir) 1635 1636 1637def to_snake_case(name: str) -> str: 1638 """ 1639 Return the given string in snake-case-style. 1640 1641 Parameters 1642 ---------- 1643 name: str 1644 The input text to convert to snake case. 1645 1646 Returns 1647 ------- 1648 A snake-case version of `name`. 1649 1650 Examples 1651 -------- 1652 >>> to_snake_case("HelloWorld!") 1653 'hello_world' 1654 >>> to_snake_case("This has spaces in it.") 1655 'this_has_spaces_in_it' 1656 >>> to_snake_case("already_in_snake_case") 1657 'already_in_snake_case' 1658 """ 1659 import re 1660 name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name) 1661 name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name) 1662 name = re.sub(r'[^\w\s]', '', name) 1663 name = re.sub(r'\s+', '_', name) 1664 return name.lower() 1665 1666 1667################## 1668# Legacy imports # 1669################## 1670 1671def choose_subaction(*args, **kwargs) -> Any: 1672 """ 1673 Placeholder function to prevent breaking legacy behavior. 1674 See `meerschaum.actions.choose_subaction`. 1675 """ 1676 from meerschaum.actions import choose_subaction as _choose_subactions 1677 return _choose_subactions(*args, **kwargs) 1678 1679 1680def print_options(*args, **kwargs) -> None: 1681 """ 1682 Placeholder function to prevent breaking legacy behavior. 1683 See `meerschaum.utils.formatting.print_options`. 1684 """ 1685 from meerschaum.utils.formatting import print_options as _print_options 1686 return _print_options(*args, **kwargs) 1687 1688 1689def to_pandas_dtype(*args, **kwargs) -> Any: 1690 """ 1691 Placeholder function to prevent breaking legacy behavior. 1692 See `meerschaum.utils.dtypes.to_pandas_dtype`. 1693 """ 1694 from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype 1695 return _to_pandas_dtype(*args, **kwargs) 1696 1697 1698def filter_unseen_df(*args, **kwargs) -> Any: 1699 """ 1700 Placeholder function to prevent breaking legacy behavior. 1701 See `meerschaum.utils.dataframe.filter_unseen_df`. 1702 """ 1703 from meerschaum.utils.dataframe import filter_unseen_df as real_function 1704 return real_function(*args, **kwargs) 1705 1706 1707def add_missing_cols_to_df(*args, **kwargs) -> Any: 1708 """ 1709 Placeholder function to prevent breaking legacy behavior. 1710 See `meerschaum.utils.dataframe.add_missing_cols_to_df`. 1711 """ 1712 from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function 1713 return real_function(*args, **kwargs) 1714 1715 1716def parse_df_datetimes(*args, **kwargs) -> Any: 1717 """ 1718 Placeholder function to prevent breaking legacy behavior. 1719 See `meerschaum.utils.dataframe.parse_df_datetimes`. 1720 """ 1721 from meerschaum.utils.dataframe import parse_df_datetimes as real_function 1722 return real_function(*args, **kwargs) 1723 1724 1725def df_from_literal(*args, **kwargs) -> Any: 1726 """ 1727 Placeholder function to prevent breaking legacy behavior. 1728 See `meerschaum.utils.dataframe.df_from_literal`. 1729 """ 1730 from meerschaum.utils.dataframe import df_from_literal as real_function 1731 return real_function(*args, **kwargs) 1732 1733 1734def get_json_cols(*args, **kwargs) -> Any: 1735 """ 1736 Placeholder function to prevent breaking legacy behavior. 1737 See `meerschaum.utils.dataframe.get_json_cols`. 1738 """ 1739 from meerschaum.utils.dataframe import get_json_cols as real_function 1740 return real_function(*args, **kwargs) 1741 1742 1743def get_unhashable_cols(*args, **kwargs) -> Any: 1744 """ 1745 Placeholder function to prevent breaking legacy behavior. 1746 See `meerschaum.utils.dataframe.get_unhashable_cols`. 1747 """ 1748 from meerschaum.utils.dataframe import get_unhashable_cols as real_function 1749 return real_function(*args, **kwargs) 1750 1751 1752def enforce_dtypes(*args, **kwargs) -> Any: 1753 """ 1754 Placeholder function to prevent breaking legacy behavior. 1755 See `meerschaum.utils.dataframe.enforce_dtypes`. 1756 """ 1757 from meerschaum.utils.dataframe import enforce_dtypes as real_function 1758 return real_function(*args, **kwargs) 1759 1760 1761def get_datetime_bound_from_df(*args, **kwargs) -> Any: 1762 """ 1763 Placeholder function to prevent breaking legacy behavior. 1764 See `meerschaum.utils.dataframe.get_datetime_bound_from_df`. 1765 """ 1766 from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function 1767 return real_function(*args, **kwargs) 1768 1769 1770def df_is_chunk_generator(*args, **kwargs) -> Any: 1771 """ 1772 Placeholder function to prevent breaking legacy behavior. 1773 See `meerschaum.utils.dataframe.df_is_chunk_generator`. 1774 """ 1775 from meerschaum.utils.dataframe import df_is_chunk_generator as real_function 1776 return real_function(*args, **kwargs) 1777 1778 1779def choices_docstring(*args, **kwargs) -> Any: 1780 """ 1781 Placeholder function to prevent breaking legacy behavior. 1782 See `meerschaum.actions.choices_docstring`. 1783 """ 1784 from meerschaum.actions import choices_docstring as real_function 1785 return real_function(*args, **kwargs) 1786 1787 1788def _get_subaction_names(*args, **kwargs) -> Any: 1789 """ 1790 Placeholder function to prevent breaking legacy behavior. 1791 See `meerschaum.actions._get_subaction_names`. 1792 """ 1793 from meerschaum.actions import _get_subaction_names as real_function 1794 return real_function(*args, **kwargs) 1795 1796 1797def json_serialize_datetime(dt: datetime) -> Union[str, None]: 1798 """ 1799 Serialize a datetime object into JSON (ISO format string). 1800 1801 Examples 1802 -------- 1803 >>> import json 1804 >>> from datetime import datetime 1805 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 1806 '{"a": "2022-01-01T00:00:00Z"}' 1807 1808 """ 1809 from meerschaum.utils.dtypes import serialize_datetime 1810 return serialize_datetime(dt) 1811 1812 1813def replace_pipes_in_dict(*args, **kwargs): 1814 """ 1815 Placeholder function to prevent breaking legacy behavior. 1816 See `meerschaum.utils.pipes.replace_pipes_in_dict`. 1817 """ 1818 from meerschaum.utils.pipes import replace_pipes_in_dict 1819 return replace_pipes_in_dict(*args, **kwargs) 1820 1821 1822def is_pipe_registered(*args, **kwargs): 1823 """ 1824 Placeholder function to prevent breaking legacy behavior. 1825 See `meerschaum.utils.pipes.is_pipe_registered`. 1826 """ 1827 from meerschaum.utils.pipes import is_pipe_registered 1828 return is_pipe_registered(*args, **kwargs) 1829 1830 1831def round_time(*args, **kwargs): 1832 """ 1833 Placeholder function to prevent breaking legacy behavior. 1834 See `meerschaum.utils.dtypes.round_time`. 1835 """ 1836 from meerschaum.utils.dtypes import round_time 1837 return round_time(*args, **kwargs) 1838 1839 1840_current_module = sys.modules[__name__] 1841__all__ = tuple( 1842 name 1843 for name, obj in globals().items() 1844 if callable(obj) 1845 and name not in __pdoc__ 1846 and getattr(obj, '__module__', None) == _current_module.__name__ 1847 and not name.startswith('_') 1848)
50def add_method_to_class( 51 func: Callable[[Any], Any], 52 class_def: 'Class', 53 method_name: Optional[str] = None, 54 keep_self: Optional[bool] = None, 55) -> Callable[[Any], Any]: 56 """ 57 Add function `func` to class `class_def`. 58 59 Parameters 60 ---------- 61 func: Callable[[Any], Any] 62 Function to be added as a method of the class 63 64 class_def: Class 65 Class to be modified. 66 67 method_name: Optional[str], default None 68 New name of the method. None will use func.__name__ (default). 69 70 Returns 71 ------- 72 The modified function object. 73 74 """ 75 from functools import wraps 76 77 is_class = isinstance(class_def, type) 78 79 @wraps(func) 80 def wrapper(self, *args, **kw): 81 return func(*args, **kw) 82 83 if method_name is None: 84 method_name = func.__name__ 85 86 setattr(class_def, method_name, ( 87 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 88 ) 89 ) 90 91 return func
Add function func
to class class_def
.
Parameters
- func (Callable[[Any], Any]): Function to be added as a method of the class
- class_def (Class): Class to be modified.
- method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
- The modified function object.
94def generate_password(length: int = 12) -> str: 95 """ 96 Generate a secure password of given length. 97 98 Parameters 99 ---------- 100 length: int, default 12 101 The length of the password. 102 103 Returns 104 ------- 105 A random password string. 106 """ 107 import secrets 108 import string 109 return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))
Generate a secure password of given length.
Parameters
- length (int, default 12): The length of the password.
Returns
- A random password string.
112def is_int(s: str) -> bool: 113 """ 114 Check if string is an int. 115 116 Parameters 117 ---------- 118 s: str 119 The string to be checked. 120 121 Returns 122 ------- 123 A bool indicating whether the string was able to be cast to an integer. 124 125 """ 126 try: 127 return float(s).is_integer() 128 except Exception: 129 return False
Check if string is an int.
Parameters
- s (str): The string to be checked.
Returns
- A bool indicating whether the string was able to be cast to an integer.
132def is_uuid(s: str) -> bool: 133 """ 134 Check if a string is a valid UUID. 135 136 Parameters 137 ---------- 138 s: str 139 The string to be checked. 140 141 Returns 142 ------- 143 A bool indicating whether the string is a valid UUID. 144 """ 145 import uuid 146 try: 147 uuid.UUID(str(s)) 148 return True 149 except Exception: 150 return False
Check if a string is a valid UUID.
Parameters
- s (str): The string to be checked.
Returns
- A bool indicating whether the string is a valid UUID.
153def string_to_dict(params_string: str) -> Dict[str, Any]: 154 """ 155 Parse a string into a dictionary. 156 157 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 158 159 Parameters 160 ---------- 161 params_string: str 162 The string to be parsed. 163 164 Returns 165 ------- 166 The parsed dictionary. 167 168 Examples 169 -------- 170 >>> string_to_dict("a:1,b:2") 171 {'a': 1, 'b': 2} 172 >>> string_to_dict('{"a": 1, "b": 2}') 173 {'a': 1, 'b': 2} 174 175 """ 176 if not params_string: 177 return {} 178 179 import json 180 181 ### Kind of a weird edge case. 182 ### In the generated compose file, there is some weird escaping happening, 183 ### so the string to be parsed starts and ends with a single quote. 184 if ( 185 isinstance(params_string, str) 186 and len(params_string) > 4 187 and params_string[1] == "{" 188 and params_string[-2] == "}" 189 ): 190 return json.loads(params_string[1:-1]) 191 192 if str(params_string).startswith('{'): 193 return json.loads(params_string) 194 195 import ast 196 params_dict = {} 197 198 items = [] 199 bracket_level = 0 200 brace_level = 0 201 current_item = '' 202 in_quotes = False 203 quote_char = '' 204 205 i = 0 206 while i < len(params_string): 207 char = params_string[i] 208 209 if in_quotes: 210 if char == quote_char and (i == 0 or params_string[i-1] != '\\'): 211 in_quotes = False 212 else: 213 if char in ('"', "'"): 214 in_quotes = True 215 quote_char = char 216 elif char == '[': 217 bracket_level += 1 218 elif char == ']': 219 bracket_level -= 1 220 elif char == '{': 221 brace_level += 1 222 elif char == '}': 223 brace_level -= 1 224 elif char == ',' and bracket_level == 0 and brace_level == 0: 225 items.append(current_item) 226 current_item = '' 227 i += 1 228 continue 229 230 current_item += char 231 i += 1 232 233 if current_item: 234 items.append(current_item) 235 236 for param in items: 237 param = param.strip() 238 if not param: 239 continue 240 241 _keys = param.split(":", maxsplit=1) 242 if len(_keys) != 2: 243 continue 244 245 keys = _keys[:-1] 246 try: 247 val = ast.literal_eval(_keys[-1]) 248 except Exception: 249 val = str(_keys[-1]) 250 251 c = params_dict 252 for _k in keys[:-1]: 253 try: 254 k = ast.literal_eval(_k) 255 except Exception: 256 k = str(_k) 257 if k not in c: 258 c[k] = {} 259 c = c[k] 260 261 c[keys[-1]] = val 262 263 return params_dict
Parse a string into a dictionary.
If the string begins with '{', parse as JSON. Otherwise use simple parsing.
Parameters
- params_string (str): The string to be parsed.
Returns
- The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2")
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
266def to_simple_dict(doc: Dict[str, Any]) -> str: 267 """ 268 Serialize a document dictionary in simple-dict format. 269 """ 270 import json 271 import ast 272 from meerschaum.utils.dtypes import json_serialize_value 273 274 def serialize_value(value): 275 if isinstance(value, str): 276 try: 277 evaluated = ast.literal_eval(value) 278 if not isinstance(evaluated, str): 279 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 280 return value 281 except (ValueError, SyntaxError, TypeError, MemoryError): 282 return value 283 284 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 285 286 return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items())
Serialize a document dictionary in simple-dict format.
289def parse_config_substitution( 290 value: str, 291 leading_key: str = 'MRSM', 292 begin_key: str = '{', 293 end_key: str = '}', 294 delimeter: str = ':', 295) -> List[Any]: 296 """ 297 Parse Meerschaum substitution syntax 298 E.g. MRSM{value1:value2} => ['value1', 'value2'] 299 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 300 """ 301 if not value.beginswith(leading_key): 302 return value 303 304 return leading_key[len(leading_key):][len():-1].split(delimeter)
Parse Meerschaum substitution syntax
E.g. MRSM{value1:value2} => ['value1', 'value2']
NOTE: Not currently used. See search_and_substitute_config
in meerschaum.config._read_yaml
.
307def edit_file( 308 path: Union['pathlib.Path', str], 309 default_editor: str = 'pyvim', 310 debug: bool = False 311) -> bool: 312 """ 313 Open a file for editing. 314 315 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 316 317 Parameters 318 ---------- 319 path: Union[pathlib.Path, str] 320 The path to the file to be edited. 321 322 default_editor: str, default 'pyvim' 323 If `$EDITOR` is not set, use this instead. 324 If `pyvim` is not installed, it will install it from PyPI. 325 326 debug: bool, default False 327 Verbosity toggle. 328 329 Returns 330 ------- 331 A bool indicating the file was successfully edited. 332 """ 333 import os 334 from subprocess import call 335 from meerschaum.utils.debug import dprint 336 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 337 try: 338 EDITOR = os.environ.get('EDITOR', default_editor) 339 if debug: 340 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 341 rc = call([EDITOR, path]) 342 except Exception as e: ### can't open with default editors 343 if debug: 344 dprint(str(e)) 345 dprint('Failed to open file with system editor. Falling back to pyvim...') 346 pyvim = attempt_import('pyvim', lazy=False) 347 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 348 return rc == 0
Open a file for editing.
Attempt to launch the user's defined $EDITOR
, otherwise use pyvim
.
Parameters
- path (Union[pathlib.Path, str]): The path to the file to be edited.
- default_editor (str, default 'pyvim'):
If
$EDITOR
is not set, use this instead. Ifpyvim
is not installed, it will install it from PyPI. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating the file was successfully edited.
351def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 352 """ 353 Determine the columns and lines in the terminal. 354 If they cannot be determined, return the default values (100 columns and 120 lines). 355 356 Parameters 357 ---------- 358 default_cols: int, default 100 359 If the columns cannot be determined, return this value. 360 361 default_lines: int, default 120 362 If the lines cannot be determined, return this value. 363 364 Returns 365 ------- 366 A tuple if integers for the columns and lines. 367 """ 368 import os 369 try: 370 size = os.get_terminal_size() 371 _cols, _lines = size.columns, size.lines 372 except Exception: 373 _cols, _lines = ( 374 int(os.environ.get('COLUMNS', str(default_cols))), 375 int(os.environ.get('LINES', str(default_lines))), 376 ) 377 return _cols, _lines
Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).
Parameters
- default_cols (int, default 100): If the columns cannot be determined, return this value.
- default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
- A tuple if integers for the columns and lines.
380def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 381 """ 382 Iterate over a list in chunks. 383 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 384 385 Parameters 386 ---------- 387 iterable: Iterable[Any] 388 The iterable to iterate over in chunks. 389 390 chunksize: int 391 The size of chunks to iterate with. 392 393 fillvalue: Optional[Any], default None 394 If the chunks do not evenly divide into the iterable, pad the end with this value. 395 396 Returns 397 ------- 398 A generator of tuples of size `chunksize`. 399 400 """ 401 from itertools import zip_longest 402 args = [iter(iterable)] * chunksize 403 return zip_longest(*args, fillvalue=fillvalue)
Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
Parameters
- iterable (Iterable[Any]): The iterable to iterate over in chunks.
- chunksize (int): The size of chunks to iterate with.
- fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
- A generator of tuples of size
chunksize
.
405def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 406 """ 407 Sort a dictionary's values and return a new dictionary. 408 409 Parameters 410 ---------- 411 d: Dict[Any, Any] 412 The dictionary to be sorted. 413 414 Returns 415 ------- 416 A sorted dictionary. 417 418 Examples 419 -------- 420 >>> sorted_dict({'b': 1, 'a': 2}) 421 {'b': 1, 'a': 2} 422 >>> sorted_dict({'b': 2, 'a': 1}) 423 {'a': 1, 'b': 2} 424 425 """ 426 try: 427 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 428 except Exception: 429 return d
Sort a dictionary's values and return a new dictionary.
Parameters
- d (Dict[Any, Any]): The dictionary to be sorted.
Returns
- A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
431def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 432 """ 433 Convert the standard pipes dictionary into a list. 434 435 Parameters 436 ---------- 437 pipes_dict: PipesDict 438 The pipes dictionary to be flattened. 439 440 Returns 441 ------- 442 A list of `Pipe` objects. 443 444 """ 445 pipes_list = [] 446 for ck in pipes_dict.values(): 447 for mk in ck.values(): 448 pipes_list += list(mk.values()) 449 return pipes_list
Convert the standard pipes dictionary into a list.
Parameters
- pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
- A list of
Pipe
objects.
452def timed_input( 453 seconds: int = 10, 454 timeout_message: str = "", 455 prompt: str = "", 456 icon: bool = False, 457 **kw 458) -> Union[str, None]: 459 """ 460 Accept user input only for a brief period of time. 461 462 Parameters 463 ---------- 464 seconds: int, default 10 465 The number of seconds to wait. 466 467 timeout_message: str, default '' 468 The message to print after the window has elapsed. 469 470 prompt: str, default '' 471 The prompt to print during the window. 472 473 icon: bool, default False 474 If `True`, print the configured input icon. 475 476 477 Returns 478 ------- 479 The input string entered by the user. 480 481 """ 482 import signal, time 483 484 class TimeoutExpired(Exception): 485 """Raise this exception when the timeout is reached.""" 486 487 def alarm_handler(signum, frame): 488 raise TimeoutExpired 489 490 # set signal handler 491 signal.signal(signal.SIGALRM, alarm_handler) 492 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 493 494 try: 495 return input(prompt) 496 except TimeoutExpired: 497 return None 498 except (EOFError, RuntimeError): 499 try: 500 print(prompt) 501 time.sleep(seconds) 502 except TimeoutExpired: 503 return None 504 finally: 505 signal.alarm(0) # cancel alarm
Accept user input only for a brief period of time.
Parameters
- seconds (int, default 10): The number of seconds to wait.
- timeout_message (str, default ''): The message to print after the window has elapsed.
- prompt (str, default ''): The prompt to print during the window.
- icon (bool, default False):
If
True
, print the configured input icon.
Returns
- The input string entered by the user.
508def enforce_gevent_monkey_patch(): 509 """ 510 Check if gevent monkey patching is enabled, and if not, then apply patching. 511 """ 512 from meerschaum.utils.packages import attempt_import 513 import socket 514 gevent, gevent_socket, gevent_monkey = attempt_import( 515 'gevent', 'gevent.socket', 'gevent.monkey' 516 ) 517 if not socket.socket is gevent_socket.socket: 518 gevent_monkey.patch_all()
Check if gevent monkey patching is enabled, and if not, then apply patching.
520def is_valid_email(email: str) -> Union['re.Match', None]: 521 """ 522 Check whether a string is a valid email. 523 524 Parameters 525 ---------- 526 email: str 527 The string to be examined. 528 529 Returns 530 ------- 531 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 532 533 Examples 534 -------- 535 >>> is_valid_email('foo') 536 >>> is_valid_email('foo@foo.com') 537 <re.Match object; span=(0, 11), match='foo@foo.com'> 538 539 """ 540 import re 541 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 542 return re.search(regex, email)
Check whether a string is a valid email.
Parameters
- email (str): The string to be examined.
Returns
- None if a string is not in email format, otherwise a
re.Match
object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
545def string_width(string: str, widest: bool = True) -> int: 546 """ 547 Calculate the width of a string, either by its widest or last line. 548 549 Parameters 550 ---------- 551 string: str: 552 The string to be examined. 553 554 widest: bool, default True 555 No longer used because `widest` is always assumed to be true. 556 557 Returns 558 ------- 559 An integer for the text's visual width. 560 561 Examples 562 -------- 563 >>> string_width('a') 564 1 565 >>> string_width('a\\nbc\\nd') 566 2 567 568 """ 569 def _widest(): 570 words = string.split('\n') 571 max_length = 0 572 for w in words: 573 length = len(w) 574 if length > max_length: 575 max_length = length 576 return max_length 577 578 return _widest()
Calculate the width of a string, either by its widest or last line.
Parameters
- string (str:): The string to be examined.
- widest (bool, default True):
No longer used because
widest
is always assumed to be true.
Returns
- An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
624def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any: 625 """ 626 Get a value from a dictionary with a tuple of keys. 627 628 Parameters 629 ---------- 630 d: Dict[Any, Any] 631 The dictionary to search. 632 633 path: Tuple[Any, ...] 634 The path of keys to traverse. 635 636 Returns 637 ------- 638 The value from the end of the path. 639 """ 640 return functools.reduce(lambda di, key: di[key], path, d)
Get a value from a dictionary with a tuple of keys.
Parameters
- d (Dict[Any, Any]): The dictionary to search.
- path (Tuple[Any, ...]): The path of keys to traverse.
Returns
- The value from the end of the path.
643def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None: 644 """ 645 Set a value in a dictionary with a tuple of keys. 646 647 Parameters 648 ---------- 649 d: Dict[Any, Any] 650 The dictionary to search. 651 652 path: Tuple[Any, ...] 653 The path of keys to traverse. 654 655 val: Any 656 The value to set at the end of the path. 657 """ 658 get_val_from_dict_path(d, path[:-1])[path[-1]] = val
Set a value in a dictionary with a tuple of keys.
Parameters
- d (Dict[Any, Any]): The dictionary to search.
- path (Tuple[Any, ...]): The path of keys to traverse.
- val (Any): The value to set at the end of the path.
661def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 662 """ 663 Recursively replace passwords in a dictionary. 664 665 Parameters 666 ---------- 667 d: Dict[str, Any] 668 The dictionary to search through. 669 670 replace_with: str, default '*' 671 The string to replace each character of the password with. 672 673 Returns 674 ------- 675 Another dictionary where values to the keys `'password'` 676 are replaced with `replace_with` (`'*'`). 677 678 Examples 679 -------- 680 >>> replace_password({'a': 1}) 681 {'a': 1} 682 >>> replace_password({'password': '123'}) 683 {'password': '***'} 684 >>> replace_password({'nested': {'password': '123'}}) 685 {'nested': {'password': '***'}} 686 >>> replace_password({'password': '123'}, replace_with='!') 687 {'password': '!!!'} 688 689 """ 690 import copy 691 _d = copy.deepcopy(d) 692 for k, v in d.items(): 693 if isinstance(v, dict): 694 _d[k] = replace_password(v) 695 elif 'password' in str(k).lower(): 696 _d[k] = ''.join([replace_with for char in str(v)]) 697 elif str(k).lower() == 'uri': 698 from meerschaum.connectors.sql import SQLConnector 699 try: 700 uri_params = SQLConnector.parse_uri(v) 701 except Exception: 702 uri_params = None 703 if not uri_params: 704 continue 705 if not 'username' in uri_params or not 'password' in uri_params: 706 continue 707 _d[k] = v.replace( 708 uri_params['username'] + ':' + uri_params['password'], 709 uri_params['username'] + ':' + ''.join( 710 [replace_with for char in str(uri_params['password'])] 711 ) 712 ) 713 return _d
Recursively replace passwords in a dictionary.
Parameters
- d (Dict[str, Any]): The dictionary to search through.
- replace_with (str, default '*'): The string to replace each character of the password with.
Returns
- Another dictionary where values to the keys
'password'
- are replaced with
replace_with
('*'
).
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
716def filter_arguments( 717 func: Callable[[Any], Any], 718 *args: Any, 719 **kwargs: Any 720) -> Tuple[Tuple[Any], Dict[str, Any]]: 721 """ 722 Filter out unsupported positional and keyword arguments. 723 724 Parameters 725 ---------- 726 func: Callable[[Any], Any] 727 The function to inspect. 728 729 *args: Any 730 Positional arguments to filter and pass to `func`. 731 732 **kwargs 733 Keyword arguments to filter and pass to `func`. 734 735 Returns 736 ------- 737 The `args` and `kwargs` accepted by `func`. 738 """ 739 args = filter_positionals(func, *args) 740 kwargs = filter_keywords(func, **kwargs) 741 return args, kwargs
Filter out unsupported positional and keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
Positional arguments to filter and pass to
func
. - **kwargs: Keyword arguments to filter and pass to
func
.
Returns
- The
args
andkwargs
accepted byfunc
.
744def filter_keywords( 745 func: Callable[[Any], Any], 746 **kw: Any 747) -> Dict[str, Any]: 748 """ 749 Filter out unsupported keyword arguments. 750 751 Parameters 752 ---------- 753 func: Callable[[Any], Any] 754 The function to inspect. 755 756 **kw: Any 757 The arguments to be filtered and passed into `func`. 758 759 Returns 760 ------- 761 A dictionary of keyword arguments accepted by `func`. 762 763 Examples 764 -------- 765 ```python 766 >>> def foo(a=1, b=2): 767 ... return a * b 768 >>> filter_keywords(foo, a=2, b=4, c=6) 769 {'a': 2, 'b': 4} 770 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 771 8 772 ``` 773 774 """ 775 import inspect 776 func_params = inspect.signature(func).parameters 777 ### If the function has a **kw method, skip filtering. 778 for param, _type in func_params.items(): 779 if '**' in str(_type): 780 return kw 781 return {k: v for k, v in kw.items() if k in func_params}
Filter out unsupported keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- **kw (Any):
The arguments to be filtered and passed into
func
.
Returns
- A dictionary of keyword arguments accepted by
func
.
Examples
>>> def foo(a=1, b=2):
... return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
784def filter_positionals( 785 func: Callable[[Any], Any], 786 *args: Any 787) -> Tuple[Any]: 788 """ 789 Filter out unsupported positional arguments. 790 791 Parameters 792 ---------- 793 func: Callable[[Any], Any] 794 The function to inspect. 795 796 *args: Any 797 The arguments to be filtered and passed into `func`. 798 NOTE: If the function signature expects more arguments than provided, 799 the missing slots will be filled with `None`. 800 801 Returns 802 ------- 803 A tuple of positional arguments accepted by `func`. 804 805 Examples 806 -------- 807 ```python 808 >>> def foo(a, b): 809 ... return a * b 810 >>> filter_positionals(foo, 2, 4, 6) 811 (2, 4) 812 >>> foo(*filter_positionals(foo, 2, 4, 6)) 813 8 814 ``` 815 816 """ 817 import inspect 818 from meerschaum.utils.warnings import warn 819 func_params = inspect.signature(func).parameters 820 acceptable_args: List[Any] = [] 821 822 def _warn_invalids(_num_invalid): 823 if _num_invalid > 0: 824 warn( 825 "Too few arguments were provided. " 826 + f"{_num_invalid} argument" 827 + ('s have ' if _num_invalid != 1 else " has ") 828 + " been filled with `None`.", 829 ) 830 831 num_invalid: int = 0 832 for i, (param, val) in enumerate(func_params.items()): 833 if '=' in str(val) or '*' in str(val): 834 _warn_invalids(num_invalid) 835 return tuple(acceptable_args) 836 837 try: 838 acceptable_args.append(args[i]) 839 except IndexError: 840 acceptable_args.append(None) 841 num_invalid += 1 842 843 _warn_invalids(num_invalid) 844 return tuple(acceptable_args)
Filter out unsupported positional arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
The arguments to be filtered and passed into
func
. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled withNone
.
Returns
- A tuple of positional arguments accepted by
func
.
Examples
>>> def foo(a, b):
... return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
847def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 848 """ 849 Convert an ordered dict to a dict. 850 Does not mutate the original OrderedDict. 851 """ 852 from collections import OrderedDict 853 _d = dict(od) 854 for k, v in od.items(): 855 if isinstance(v, OrderedDict) or ( 856 issubclass(type(v), OrderedDict) 857 ): 858 _d[k] = dict_from_od(v) 859 return _d
Convert an ordered dict to a dict. Does not mutate the original OrderedDict.
862def remove_ansi(s: str) -> str: 863 """ 864 Remove ANSI escape characters from a string. 865 866 Parameters 867 ---------- 868 s: str: 869 The string to be cleaned. 870 871 Returns 872 ------- 873 A string with the ANSI characters removed. 874 875 Examples 876 -------- 877 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 878 'Hello, World!' 879 880 """ 881 import re 882 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
Remove ANSI escape characters from a string.
Parameters
- s (str:): The string to be cleaned.
Returns
- A string with the ANSI characters removed.
Examples
>>> remove_ansi("[1;31mHello, World![0m")
'Hello, World!'
885def get_connector_labels( 886 *types: str, 887 search_term: str = '', 888 ignore_exact_match = True, 889 _additional_options: Optional[List[str]] = None, 890) -> List[str]: 891 """ 892 Read connector labels from the configuration dictionary. 893 894 Parameters 895 ---------- 896 *types: str 897 The connector types. 898 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 899 900 search_term: str, default '' 901 A filter on the connectors' labels. 902 903 ignore_exact_match: bool, default True 904 If `True`, skip a connector if the search_term is an exact match. 905 906 Returns 907 ------- 908 A list of the keys of defined connectors. 909 910 """ 911 from meerschaum.config import get_config 912 connectors = get_config('meerschaum', 'connectors') 913 914 _types = list(types) 915 if len(_types) == 0: 916 _types = list(connectors.keys()) + ['plugin'] 917 918 conns = [] 919 for t in _types: 920 if t == 'plugin': 921 from meerschaum.plugins import get_data_plugins 922 conns += [ 923 f'{t}:' + plugin.module.__name__.split('.')[-1] 924 for plugin in get_data_plugins() 925 ] 926 continue 927 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 928 929 if _additional_options: 930 conns += _additional_options 931 932 possibilities = [ 933 c 934 for c in conns 935 if c.startswith(search_term) 936 and c != ( 937 search_term if ignore_exact_match else '' 938 ) 939 ] 940 return sorted(possibilities)
Read connector labels from the configuration dictionary.
Parameters
- *types (str):
The connector types.
If none are provided, use the defined types (
'sql'
and'api'
) and'plugin'
. - search_term (str, default ''): A filter on the connectors' labels.
- ignore_exact_match (bool, default True):
If
True
, skip a connector if the search_term is an exact match.
Returns
- A list of the keys of defined connectors.
943def wget( 944 url: str, 945 dest: Optional[Union[str, 'pathlib.Path']] = None, 946 headers: Optional[Dict[str, Any]] = None, 947 color: bool = True, 948 debug: bool = False, 949 **kw: Any 950) -> 'pathlib.Path': 951 """ 952 Mimic `wget` with `requests`. 953 954 Parameters 955 ---------- 956 url: str 957 The URL to the resource to be downloaded. 958 959 dest: Optional[Union[str, pathlib.Path]], default None 960 The destination path of the downloaded file. 961 If `None`, save to the current directory. 962 963 color: bool, default True 964 If `debug` is `True`, print color output. 965 966 debug: bool, default False 967 Verbosity toggle. 968 969 Returns 970 ------- 971 The path to the downloaded file. 972 973 """ 974 from meerschaum.utils.warnings import warn, error 975 from meerschaum.utils.debug import dprint 976 import os, pathlib, re, urllib.request 977 if headers is None: 978 headers = {} 979 request = urllib.request.Request(url, headers=headers) 980 if not color: 981 dprint = print 982 if debug: 983 dprint(f"Downloading from '{url}'...") 984 try: 985 response = urllib.request.urlopen(request) 986 except Exception as e: 987 import ssl 988 ssl._create_default_https_context = ssl._create_unverified_context 989 try: 990 response = urllib.request.urlopen(request) 991 except Exception as _e: 992 print(_e) 993 response = None 994 if response is None or response.code != 200: 995 error_msg = f"Failed to download from '{url}'." 996 if color: 997 error(error_msg) 998 else: 999 print(error_msg) 1000 import sys 1001 sys.exit(1) 1002 1003 d = response.headers.get('content-disposition', None) 1004 fname = ( 1005 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1006 else url.split('/')[-1] 1007 ) 1008 1009 if dest is None: 1010 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1011 elif isinstance(dest, str): 1012 dest = pathlib.Path(dest) 1013 1014 with open(dest, 'wb') as f: 1015 f.write(response.fp.read()) 1016 1017 if debug: 1018 dprint(f"Downloaded file '{dest}'.") 1019 1020 return dest
Mimic wget
with requests
.
Parameters
- url (str): The URL to the resource to be downloaded.
- dest (Optional[Union[str, pathlib.Path]], default None):
The destination path of the downloaded file.
If
None
, save to the current directory. - color (bool, default True):
If
debug
isTrue
, print color output. - debug (bool, default False): Verbosity toggle.
Returns
- The path to the downloaded file.
1023def async_wrap(func): 1024 """ 1025 Run a synchronous function as async. 1026 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1027 """ 1028 import asyncio 1029 from functools import wraps, partial 1030 1031 @wraps(func) 1032 async def run(*args, loop=None, executor=None, **kwargs): 1033 if loop is None: 1034 loop = asyncio.get_event_loop() 1035 pfunc = partial(func, *args, **kwargs) 1036 return await loop.run_in_executor(executor, pfunc) 1037 return run
Run a synchronous function as async. https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1040def debug_trace(browser: bool = True): 1041 """ 1042 Open a web-based debugger to trace the execution of the program. 1043 1044 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1045 """ 1046 from meerschaum.utils.debug import trace 1047 trace(browser=browser)
Open a web-based debugger to trace the execution of the program.
This is an alias import for meerschaum.utils.debug.debug_trace
.
1050def items_str( 1051 items: List[Any], 1052 quotes: bool = True, 1053 quote_str: str = "'", 1054 commas: bool = True, 1055 comma_str: str = ',', 1056 and_: bool = True, 1057 and_str: str = 'and', 1058 oxford_comma: bool = True, 1059 spaces: bool = True, 1060 space_str = ' ', 1061) -> str: 1062 """ 1063 Return a formatted string if list items separated by commas. 1064 1065 Parameters 1066 ---------- 1067 items: [List[Any]] 1068 The items to be printed as an English list. 1069 1070 quotes: bool, default True 1071 If `True`, wrap items in quotes. 1072 1073 quote_str: str, default "'" 1074 If `quotes` is `True`, prepend and append each item with this string. 1075 1076 and_: bool, default True 1077 If `True`, include the word 'and' before the final item in the list. 1078 1079 and_str: str, default 'and' 1080 If `and_` is True, insert this string where 'and' normally would in and English list. 1081 1082 oxford_comma: bool, default True 1083 If `True`, include the Oxford Comma (comma before the final 'and'). 1084 Only applies when `and_` is `True`. 1085 1086 spaces: bool, default True 1087 If `True`, separate items with `space_str` 1088 1089 space_str: str, default ' ' 1090 If `spaces` is `True`, separate items with this string. 1091 1092 Returns 1093 ------- 1094 A string of the items as an English list. 1095 1096 Examples 1097 -------- 1098 >>> items_str([1,2,3]) 1099 "'1', '2', and '3'" 1100 >>> items_str([1,2,3], quotes=False) 1101 '1, 2, and 3' 1102 >>> items_str([1,2,3], and_=False) 1103 "'1', '2', '3'" 1104 >>> items_str([1,2,3], spaces=False, and_=False) 1105 "'1','2','3'" 1106 >>> items_str([1,2,3], oxford_comma=False) 1107 "'1', '2' and '3'" 1108 >>> items_str([1,2,3], quote_str=":") 1109 ':1:, :2:, and :3:' 1110 >>> items_str([1,2,3], and_str="or") 1111 "'1', '2', or '3'" 1112 >>> items_str([1,2,3], space_str="_") 1113 "'1',_'2',_and_'3'" 1114 1115 """ 1116 if not items: 1117 return '' 1118 1119 q = quote_str if quotes else '' 1120 s = space_str if spaces else '' 1121 a = and_str if and_ else '' 1122 c = comma_str if commas else '' 1123 1124 if len(items) == 1: 1125 return q + str(list(items)[0]) + q 1126 1127 if len(items) == 2: 1128 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1129 1130 sep = q + c + s + q 1131 output = q + sep.join(str(i) for i in items[:-1]) + q 1132 if oxford_comma: 1133 output += c 1134 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1135 return output
Return a formatted string if list items separated by commas.
Parameters
- items ([List[Any]]): The items to be printed as an English list.
- quotes (bool, default True):
If
True
, wrap items in quotes. - quote_str (str, default "'"):
If
quotes
isTrue
, prepend and append each item with this string. - and_ (bool, default True):
If
True
, include the word 'and' before the final item in the list. - and_str (str, default 'and'):
If
and_
is True, insert this string where 'and' normally would in and English list. - oxford_comma (bool, default True):
If
True
, include the Oxford Comma (comma before the final 'and'). Only applies whenand_
isTrue
. - spaces (bool, default True):
If
True
, separate items withspace_str
- space_str (str, default ' '):
If
spaces
isTrue
, separate items with this string.
Returns
- A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
1138def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str: 1139 """ 1140 Return a human-readable string for a `timedelta` (or `int` minutes). 1141 1142 Parameters 1143 ---------- 1144 delta: Union[timedelta, int] 1145 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1146 1147 round_unit: bool, default False 1148 If `True`, round the output to a single unit. 1149 1150 Returns 1151 ------- 1152 A formatted string, fit for human eyes. 1153 """ 1154 from meerschaum.utils.packages import attempt_import 1155 if is_int(str(delta)) and not round_unit: 1156 return str(delta) 1157 1158 humanfriendly = attempt_import('humanfriendly', lazy=False) 1159 delta_seconds = ( 1160 delta.total_seconds() 1161 if hasattr(delta, 'total_seconds') 1162 else (delta * 60) 1163 ) 1164 1165 is_negative = delta_seconds < 0 1166 delta_seconds = abs(delta_seconds) 1167 replace_units = {} 1168 1169 if round_unit: 1170 if delta_seconds < 1: 1171 delta_seconds = round(delta_seconds, 2) 1172 elif delta_seconds < 60: 1173 delta_seconds = int(delta_seconds) 1174 elif delta_seconds < 3600: 1175 delta_seconds = int(delta_seconds / 60) * 60 1176 elif delta_seconds < 86400: 1177 delta_seconds = int(delta_seconds / 3600) * 3600 1178 elif delta_seconds < (86400 * 7): 1179 delta_seconds = int(delta_seconds / 86400) * 86400 1180 elif delta_seconds < (86400 * 7 * 4): 1181 delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7) 1182 elif delta_seconds < (86400 * 7 * 4 * 13): 1183 delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7) 1184 replace_units['weeks'] = 'months' 1185 else: 1186 delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364) 1187 1188 delta_str = humanfriendly.format_timespan(delta_seconds) 1189 if ',' in delta_str and round_unit: 1190 delta_str = delta_str.split(',')[0] 1191 elif ' and ' in delta_str and round_unit: 1192 delta_str = delta_str.split(' and ')[0] 1193 1194 for parsed_unit, replacement_unit in replace_units.items(): 1195 delta_str = delta_str.replace(parsed_unit, replacement_unit) 1196 1197 return delta_str + (' ago' if is_negative else '')
Return a human-readable string for a timedelta
(or int
minutes).
Parameters
- delta (Union[timedelta, int]):
The interval to print. If
delta
is an integer, assume it corresponds to minutes. - round_unit (bool, default False):
If
True
, round the output to a single unit.
Returns
- A formatted string, fit for human eyes.
1200def is_docker_available() -> bool: 1201 """Check if we can connect to the Docker engine.""" 1202 import subprocess 1203 try: 1204 has_docker = subprocess.call( 1205 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1206 ) == 0 1207 except Exception: 1208 has_docker = False 1209 return has_docker
Check if we can connect to the Docker engine.
1212def is_android() -> bool: 1213 """Return `True` if the current platform is Android.""" 1214 import sys 1215 return hasattr(sys, 'getandroidapilevel')
Return True
if the current platform is Android.
1218def is_bcp_available() -> bool: 1219 """Check if the MSSQL `bcp` utility is installed.""" 1220 import subprocess 1221 1222 try: 1223 has_bcp = subprocess.call( 1224 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1225 ) == 0 1226 except Exception: 1227 has_bcp = False 1228 return has_bcp
Check if the MSSQL bcp
utility is installed.
1231def is_systemd_available() -> bool: 1232 """Check if running on systemd.""" 1233 import subprocess 1234 try: 1235 has_systemctl = subprocess.call( 1236 ['systemctl', 'whoami'], 1237 stdout=subprocess.DEVNULL, 1238 stderr=subprocess.STDOUT, 1239 ) == 0 1240 except FileNotFoundError: 1241 has_systemctl = False 1242 except Exception: 1243 import traceback 1244 traceback.print_exc() 1245 has_systemctl = False 1246 return has_systemctl
Check if running on systemd.
1249def is_tmux_available() -> bool: 1250 """ 1251 Check if `tmux` is installed. 1252 """ 1253 import subprocess 1254 try: 1255 has_tmux = subprocess.call( 1256 ['tmux', '-V'], 1257 stdout=subprocess.DEVNULL, 1258 stderr=subprocess.STDOUT 1259 ) == 0 1260 except FileNotFoundError: 1261 has_tmux = False 1262 except Exception: 1263 has_tmux = False 1264 return has_tmux
Check if tmux
is installed.
1266def get_last_n_lines(file_name: str, N: int): 1267 """ 1268 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1269 """ 1270 import os 1271 # Create an empty list to keep the track of last N lines 1272 list_of_lines = [] 1273 # Open file for reading in binary mode 1274 with open(file_name, 'rb') as read_obj: 1275 # Move the cursor to the end of the file 1276 read_obj.seek(0, os.SEEK_END) 1277 # Create a buffer to keep the last read line 1278 buffer = bytearray() 1279 # Get the current position of pointer i.e eof 1280 pointer_location = read_obj.tell() 1281 # Loop till pointer reaches the top of the file 1282 while pointer_location >= 0: 1283 # Move the file pointer to the location pointed by pointer_location 1284 read_obj.seek(pointer_location) 1285 # Shift pointer location by -1 1286 pointer_location = pointer_location -1 1287 # read that byte / character 1288 new_byte = read_obj.read(1) 1289 # If the read byte is new line character then it means one line is read 1290 if new_byte == b'\n': 1291 # Save the line in list of lines 1292 list_of_lines.append(buffer.decode()[::-1]) 1293 # If the size of list reaches N, then return the reversed list 1294 if len(list_of_lines) == N: 1295 return list(reversed(list_of_lines)) 1296 # Reinitialize the byte array to save next line 1297 buffer = bytearray() 1298 else: 1299 # If last read character is not eol then add it in buffer 1300 buffer.extend(new_byte) 1301 # As file is read completely, if there is still data in buffer, then its first line. 1302 if len(buffer) > 0: 1303 list_of_lines.append(buffer.decode()[::-1]) 1304 # return the reversed list 1305 return list(reversed(list_of_lines))
1308def tail(f, n, offset=None): 1309 """ 1310 https://stackoverflow.com/a/692616/9699829 1311 1312 Reads n lines from f with an offset of offset lines. The return 1313 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1314 an indicator that is `True` if there are more lines in the file. 1315 """ 1316 avg_line_length = 74 1317 to_read = n + (offset or 0) 1318 1319 while True: 1320 try: 1321 f.seek(-(avg_line_length * to_read), 2) 1322 except IOError: 1323 # woops. apparently file is smaller than what we want 1324 # to step back, go to the beginning instead 1325 f.seek(0) 1326 pos = f.tell() 1327 lines = f.read().splitlines() 1328 if len(lines) >= to_read or pos == 0: 1329 return lines[-to_read:offset and -offset or None], \ 1330 len(lines) > to_read or pos > 0 1331 avg_line_length *= 1.3
https://stackoverflow.com/a/692616/9699829
Reads n lines from f with an offset of offset lines. The return
value is a tuple in the form (lines, has_more)
where has_more
is
an indicator that is True
if there are more lines in the file.
1334def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1335 """ 1336 Remove characters from each section of a string until the length is within the limit. 1337 1338 Parameters 1339 ---------- 1340 item: str 1341 The item name to be truncated. 1342 1343 delimeter: str, default '_' 1344 Split `item` by this string into several sections. 1345 1346 max_len: int, default 128 1347 The max acceptable length of the truncated version of `item`. 1348 1349 Returns 1350 ------- 1351 The truncated string. 1352 1353 Examples 1354 -------- 1355 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1356 'ab_de_gh' 1357 1358 """ 1359 if len(item) < max_len: 1360 return item 1361 1362 def _shorten(s: str) -> str: 1363 return s[:-1] if len(s) > 1 else s 1364 1365 sections = list(enumerate(item.split('_'))) 1366 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1367 available_chars = max_len - len(sections) 1368 1369 _sections = [(i, s) for i, s in sorted_sections] 1370 _sections_len = sum([len(s) for i, s in _sections]) 1371 _old_sections_len = _sections_len 1372 while _sections_len > available_chars: 1373 _sections = [(i, _shorten(s)) for i, s in _sections] 1374 _old_sections_len = _sections_len 1375 _sections_len = sum([len(s) for i, s in _sections]) 1376 if _old_sections_len == _sections_len: 1377 raise Exception(f"String could not be truncated: '{item}'") 1378 1379 new_sections = sorted(_sections, key=lambda x: x[0]) 1380 return delimeter.join([s for i, s in new_sections])
Remove characters from each section of a string until the length is within the limit.
Parameters
- item (str): The item name to be truncated.
- delimeter (str, default '_'):
Split
item
by this string into several sections. - max_len (int, default 128):
The max acceptable length of the truncated version of
item
.
Returns
- The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
1383def truncate_text_for_display( 1384 text: str, 1385 max_length: int = 50, 1386 suffix: str = '…', 1387) -> str: 1388 """ 1389 Truncate a potentially long string for display purposes. 1390 1391 Parameters 1392 ---------- 1393 text: str 1394 The string to be truncated. 1395 1396 max_length: int, default 60 1397 The maximum length of `text` before truncation. 1398 1399 suffix: str, default '…' 1400 The string to append to the length of `text` to indicate truncation. 1401 1402 Returns 1403 ------- 1404 A string of length `max_length` or less. 1405 """ 1406 text_length = len(text) 1407 if text_length <= max_length: 1408 return text 1409 1410 suffix_length = len(suffix) 1411 1412 truncated_text = text[:max_length - suffix_length] 1413 return truncated_text + suffix
Truncate a potentially long string for display purposes.
Parameters
- text (str): The string to be truncated.
- max_length (int, default 60):
The maximum length of
text
before truncation. - suffix (str, default '…'):
The string to append to the length of
text
to indicate truncation.
Returns
- A string of length
max_length
or less.
1416def separate_negation_values( 1417 vals: Union[List[str], Tuple[str]], 1418 negation_prefix: Optional[str] = None, 1419) -> Tuple[List[str], List[str]]: 1420 """ 1421 Separate the negated values from the positive ones. 1422 Return two lists: positive and negative values. 1423 1424 Parameters 1425 ---------- 1426 vals: Union[List[str], Tuple[str]] 1427 A list of strings to parse. 1428 1429 negation_prefix: Optional[str], default None 1430 Include values that start with this string in the second list. 1431 If `None`, use the system default (`_`). 1432 """ 1433 if negation_prefix is None: 1434 from meerschaum._internal.static import STATIC_CONFIG 1435 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1436 _in_vals, _ex_vals = [], [] 1437 for v in vals: 1438 if str(v).startswith(negation_prefix): 1439 _ex_vals.append(str(v)[len(negation_prefix):]) 1440 else: 1441 _in_vals.append(v) 1442 1443 return _in_vals, _ex_vals
Separate the negated values from the positive ones. Return two lists: positive and negative values.
Parameters
- vals (Union[List[str], Tuple[str]]): A list of strings to parse.
- negation_prefix (Optional[str], default None):
Include values that start with this string in the second list.
If
None
, use the system default (_
).
1446def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1447 """ 1448 Translate a params dictionary into lists of include- and exclude-values. 1449 1450 Parameters 1451 ---------- 1452 params: Optional[Dict[str, Any]] 1453 A params query dictionary. 1454 1455 Returns 1456 ------- 1457 A dictionary mapping keys to a tuple of lists for include and exclude values. 1458 1459 Examples 1460 -------- 1461 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1462 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1463 """ 1464 if not params: 1465 return {} 1466 return { 1467 col: separate_negation_values( 1468 ( 1469 val 1470 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype') 1471 else [val] 1472 ) 1473 ) 1474 for col, val in params.items() 1475 }
Translate a params dictionary into lists of include- and exclude-values.
Parameters
- params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
- A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
1478def flatten_list(list_: List[Any]) -> List[Any]: 1479 """ 1480 Recursively flatten a list. 1481 """ 1482 for item in list_: 1483 if isinstance(item, list): 1484 yield from flatten_list(item) 1485 else: 1486 yield item
Recursively flatten a list.
1489def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1490 """ 1491 Parse a string containing the text to be passed into a function 1492 and return a tuple of args, kwargs. 1493 1494 Parameters 1495 ---------- 1496 args_str: str 1497 The contents of the function parameter (as a string). 1498 1499 Returns 1500 ------- 1501 A tuple of args (tuple) and kwargs (dict[str, Any]). 1502 1503 Examples 1504 -------- 1505 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1506 (123, 456), {'foo': 789, 'bar': 'baz'} 1507 """ 1508 import ast 1509 args = [] 1510 kwargs = {} 1511 1512 for part in args_str.split(','): 1513 if '=' in part: 1514 key, val = part.split('=', 1) 1515 kwargs[key.strip()] = ast.literal_eval(val) 1516 else: 1517 args.append(ast.literal_eval(part.strip())) 1518 1519 return tuple(args), kwargs
Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.
Parameters
- args_str (str): The contents of the function parameter (as a string).
Returns
- A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
1522def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1523 """ 1524 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1525 1526 Parameters 1527 ---------- 1528 src_path: pathlib.Path 1529 The source path. 1530 1531 dest_path: pathlib.Path 1532 The destination path. 1533 1534 Returns 1535 ------- 1536 A SuccessTuple indicating success. 1537 """ 1538 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1539 return True, "Symlink already exists." 1540 try: 1541 dest_path.symlink_to(src_path) 1542 success = True 1543 except Exception as e: 1544 success = False 1545 msg = str(e) 1546 if success: 1547 return success, "Success" 1548 1549 ### Failed to create a symlink. 1550 ### If we're not on Windows, return an error. 1551 import platform 1552 if platform.system() != 'Windows': 1553 return success, msg 1554 1555 try: 1556 import _winapi 1557 except ImportError: 1558 return False, "Unable to import _winapi." 1559 1560 if src_path.is_dir(): 1561 try: 1562 _winapi.CreateJunction(str(src_path), str(dest_path)) 1563 except Exception as e: 1564 return False, str(e) 1565 return True, "Success" 1566 1567 ### Last resort: copy the file on Windows. 1568 import shutil 1569 try: 1570 shutil.copy(src_path, dest_path) 1571 except Exception as e: 1572 return False, str(e) 1573 1574 return True, "Success"
Wrap around pathlib.Path.symlink_to
, but add support for Windows.
Parameters
- src_path (pathlib.Path): The source path.
- dest_path (pathlib.Path): The destination path.
Returns
- A SuccessTuple indicating success.
1577def is_symlink(path: pathlib.Path) -> bool: 1578 """ 1579 Wrap `path.is_symlink()` but add support for Windows junctions. 1580 """ 1581 if path.is_symlink(): 1582 return True 1583 import platform 1584 import os 1585 if platform.system() != 'Windows': 1586 return False 1587 try: 1588 return bool(os.readlink(path)) 1589 except OSError: 1590 return False
Wrap path.is_symlink()
but add support for Windows junctions.
1593def parametrized(dec): 1594 """ 1595 A meta-decorator for allowing other decorator functions to have parameters. 1596 1597 https://stackoverflow.com/a/26151604/9699829 1598 """ 1599 def layer(*args, **kwargs): 1600 def repl(f): 1601 return dec(f, *args, **kwargs) 1602 return repl 1603 return layer
A meta-decorator for allowing other decorator functions to have parameters.
1606def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1607 """ 1608 Safely extract a TAR file to a give directory. 1609 This defends against CVE-2007-4559. 1610 1611 Parameters 1612 ---------- 1613 tarf: file 1614 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1615 1616 output_dir: Union[str, pathlib.Path] 1617 The output directory. 1618 """ 1619 import os 1620 1621 def is_within_directory(directory, target): 1622 abs_directory = os.path.abspath(directory) 1623 abs_target = os.path.abspath(target) 1624 prefix = os.path.commonprefix([abs_directory, abs_target]) 1625 return prefix == abs_directory 1626 1627 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1628 for member in tar.getmembers(): 1629 member_path = os.path.join(path, member.name) 1630 if not is_within_directory(path, member_path): 1631 raise Exception("Attempted Path Traversal in Tar File") 1632 1633 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1634 1635 return safe_extract(tarf, output_dir)
Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.
Parameters
- tarf (file):
The TAR file opened with
tarfile.open(path, 'r:gz')
. - output_dir (Union[str, pathlib.Path]): The output directory.
1638def to_snake_case(name: str) -> str: 1639 """ 1640 Return the given string in snake-case-style. 1641 1642 Parameters 1643 ---------- 1644 name: str 1645 The input text to convert to snake case. 1646 1647 Returns 1648 ------- 1649 A snake-case version of `name`. 1650 1651 Examples 1652 -------- 1653 >>> to_snake_case("HelloWorld!") 1654 'hello_world' 1655 >>> to_snake_case("This has spaces in it.") 1656 'this_has_spaces_in_it' 1657 >>> to_snake_case("already_in_snake_case") 1658 'already_in_snake_case' 1659 """ 1660 import re 1661 name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name) 1662 name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name) 1663 name = re.sub(r'[^\w\s]', '', name) 1664 name = re.sub(r'\s+', '_', name) 1665 return name.lower()
Return the given string in snake-case-style.
Parameters
- name (str): The input text to convert to snake case.
Returns
- A snake-case version of
name
.
Examples
>>> to_snake_case("HelloWorld!")
'hello_world'
>>> to_snake_case("This has spaces in it.")
'this_has_spaces_in_it'
>>> to_snake_case("already_in_snake_case")
'already_in_snake_case'
1672def choose_subaction(*args, **kwargs) -> Any: 1673 """ 1674 Placeholder function to prevent breaking legacy behavior. 1675 See `meerschaum.actions.choose_subaction`. 1676 """ 1677 from meerschaum.actions import choose_subaction as _choose_subactions 1678 return _choose_subactions(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.actions.choose_subaction
.
1681def print_options(*args, **kwargs) -> None: 1682 """ 1683 Placeholder function to prevent breaking legacy behavior. 1684 See `meerschaum.utils.formatting.print_options`. 1685 """ 1686 from meerschaum.utils.formatting import print_options as _print_options 1687 return _print_options(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.utils.formatting.print_options
.
1798def json_serialize_datetime(dt: datetime) -> Union[str, None]: 1799 """ 1800 Serialize a datetime object into JSON (ISO format string). 1801 1802 Examples 1803 -------- 1804 >>> import json 1805 >>> from datetime import datetime 1806 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 1807 '{"a": "2022-01-01T00:00:00Z"}' 1808 1809 """ 1810 from meerschaum.utils.dtypes import serialize_datetime 1811 return serialize_datetime(dt)
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'