meerschaum.utils.misc
Miscellaneous functions go here
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4""" 5Miscellaneous functions go here 6""" 7 8from __future__ import annotations 9 10import os 11import sys 12import functools 13import pathlib 14from datetime import timedelta, datetime 15 16from meerschaum.utils.typing import ( 17 Union, 18 Any, 19 Callable, 20 Optional, 21 List, 22 Dict, 23 SuccessTuple, 24 Iterable, 25 PipesDict, 26 Tuple, 27 TYPE_CHECKING, 28) 29if TYPE_CHECKING: 30 import collections 31 32__pdoc__: Dict[str, bool] = { 33 'to_pandas_dtype': False, 34 'filter_unseen_df': False, 35 'add_missing_cols_to_df': False, 36 'parse_df_datetimes': False, 37 'df_from_literal': False, 38 'get_json_cols': False, 39 'get_unhashable_cols': False, 40 'enforce_dtypes': False, 41 'get_datetime_bound_from_df': False, 42 'df_is_chunk_generator': False, 43 'choices_docstring': False, 44 '_get_subaction_names': False, 45 'is_pipe_registered': False, 46 'replace_pipes_in_dict': False, 47 'round_time': False, 48} 49 50 51def add_method_to_class( 52 func: Callable[[Any], Any], 53 class_def: 'Class', 54 method_name: Optional[str] = None, 55 keep_self: Optional[bool] = None, 56) -> Callable[[Any], Any]: 57 """ 58 Add function `func` to class `class_def`. 59 60 Parameters 61 ---------- 62 func: Callable[[Any], Any] 63 Function to be added as a method of the class 64 65 class_def: Class 66 Class to be modified. 67 68 method_name: Optional[str], default None 69 New name of the method. None will use func.__name__ (default). 70 71 Returns 72 ------- 73 The modified function object. 74 75 """ 76 from functools import wraps 77 78 is_class = isinstance(class_def, type) 79 80 @wraps(func) 81 def wrapper(self, *args, **kw): 82 return func(*args, **kw) 83 84 if method_name is None: 85 method_name = func.__name__ 86 87 setattr(class_def, method_name, ( 88 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 89 ) 90 ) 91 92 return func 93 94 95def generate_password(length: int = 12) -> str: 96 """ 97 Generate a secure password of given length. 98 99 Parameters 100 ---------- 101 length: int, default 12 102 The length of the password. 103 104 Returns 105 ------- 106 A random password string. 107 """ 108 import secrets 109 import string 110 return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length))) 111 112 113def is_int(s: str) -> bool: 114 """ 115 Check if string is an int. 116 117 Parameters 118 ---------- 119 s: str 120 The string to be checked. 121 122 Returns 123 ------- 124 A bool indicating whether the string was able to be cast to an integer. 125 126 """ 127 try: 128 return float(s).is_integer() 129 except Exception: 130 return False 131 132 133def is_uuid(s: str) -> bool: 134 """ 135 Check if a string is a valid UUID. 136 137 Parameters 138 ---------- 139 s: str 140 The string to be checked. 141 142 Returns 143 ------- 144 A bool indicating whether the string is a valid UUID. 145 """ 146 import uuid 147 try: 148 uuid.UUID(str(s)) 149 return True 150 except Exception: 151 return False 152 153 154def string_to_dict(params_string: str) -> Dict[str, Any]: 155 """ 156 Parse a string into a dictionary. 157 158 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 159 160 Parameters 161 ---------- 162 params_string: str 163 The string to be parsed. 164 165 Returns 166 ------- 167 The parsed dictionary. 168 169 Examples 170 -------- 171 >>> string_to_dict("a:1,b:2") 172 {'a': 1, 'b': 2} 173 >>> string_to_dict('{"a": 1, "b": 2}') 174 {'a': 1, 'b': 2} 175 176 """ 177 if not params_string: 178 return {} 179 180 import json 181 182 ### Kind of a weird edge case. 183 ### In the generated compose file, there is some weird escaping happening, 184 ### so the string to be parsed starts and ends with a single quote. 185 if ( 186 isinstance(params_string, str) 187 and len(params_string) > 4 188 and params_string[1] == "{" 189 and params_string[-2] == "}" 190 ): 191 return json.loads(params_string[1:-1]) 192 193 if str(params_string).startswith('{'): 194 return json.loads(params_string) 195 196 import ast 197 params_dict = {} 198 199 items = [] 200 bracket_level = 0 201 brace_level = 0 202 current_item = '' 203 in_quotes = False 204 quote_char = '' 205 206 i = 0 207 while i < len(params_string): 208 char = params_string[i] 209 210 if in_quotes: 211 if char == quote_char and (i == 0 or params_string[i-1] != '\\'): 212 in_quotes = False 213 else: 214 if char in ('"', "'"): 215 in_quotes = True 216 quote_char = char 217 elif char == '[': 218 bracket_level += 1 219 elif char == ']': 220 bracket_level -= 1 221 elif char == '{': 222 brace_level += 1 223 elif char == '}': 224 brace_level -= 1 225 elif char == ',' and bracket_level == 0 and brace_level == 0: 226 items.append(current_item) 227 current_item = '' 228 i += 1 229 continue 230 231 current_item += char 232 i += 1 233 234 if current_item: 235 items.append(current_item) 236 237 for param in items: 238 param = param.strip() 239 if not param: 240 continue 241 242 _keys = param.split(":", maxsplit=1) 243 if len(_keys) != 2: 244 continue 245 246 keys = _keys[:-1] 247 try: 248 val = ast.literal_eval(_keys[-1]) 249 except Exception: 250 val = str(_keys[-1]) 251 252 c = params_dict 253 for _k in keys[:-1]: 254 try: 255 k = ast.literal_eval(_k) 256 except Exception: 257 k = str(_k) 258 if k not in c: 259 c[k] = {} 260 c = c[k] 261 262 c[keys[-1]] = val 263 264 return params_dict 265 266 267def to_simple_dict(doc: Dict[str, Any]) -> str: 268 """ 269 Serialize a document dictionary in simple-dict format. 270 """ 271 import json 272 import ast 273 from meerschaum.utils.dtypes import json_serialize_value 274 275 def serialize_value(value): 276 if isinstance(value, str): 277 try: 278 evaluated = ast.literal_eval(value) 279 if not isinstance(evaluated, str): 280 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 281 return value 282 except (ValueError, SyntaxError, TypeError, MemoryError): 283 return value 284 285 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 286 287 return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items()) 288 289 290def parse_config_substitution( 291 value: str, 292 leading_key: str = 'MRSM', 293 begin_key: str = '{', 294 end_key: str = '}', 295 delimeter: str = ':', 296) -> List[Any]: 297 """ 298 Parse Meerschaum substitution syntax 299 E.g. MRSM{value1:value2} => ['value1', 'value2'] 300 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 301 """ 302 if not value.beginswith(leading_key): 303 return value 304 305 return leading_key[len(leading_key):][len():-1].split(delimeter) 306 307 308def edit_file( 309 path: Union[pathlib.Path, str], 310 default_editor: str = 'pyvim', 311 debug: bool = False 312) -> bool: 313 """ 314 Open a file for editing. 315 316 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 317 318 Parameters 319 ---------- 320 path: Union[pathlib.Path, str] 321 The path to the file to be edited. 322 323 default_editor: str, default 'pyvim' 324 If `$EDITOR` is not set, use this instead. 325 If `pyvim` is not installed, it will install it from PyPI. 326 327 debug: bool, default False 328 Verbosity toggle. 329 330 Returns 331 ------- 332 A bool indicating the file was successfully edited. 333 """ 334 from subprocess import call 335 from meerschaum.utils.debug import dprint 336 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 337 try: 338 EDITOR = os.environ.get('EDITOR', default_editor) 339 if debug: 340 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 341 rc = call([EDITOR, path]) 342 except Exception as e: ### can't open with default editors 343 if debug: 344 dprint(str(e)) 345 dprint('Failed to open file with system editor. Falling back to pyvim...') 346 pyvim = attempt_import('pyvim', lazy=False) 347 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 348 return rc == 0 349 350 351def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 352 """ 353 Determine the columns and lines in the terminal. 354 If they cannot be determined, return the default values (100 columns and 120 lines). 355 356 Parameters 357 ---------- 358 default_cols: int, default 100 359 If the columns cannot be determined, return this value. 360 361 default_lines: int, default 120 362 If the lines cannot be determined, return this value. 363 364 Returns 365 ------- 366 A tuple if integers for the columns and lines. 367 """ 368 try: 369 size = os.get_terminal_size() 370 _cols, _lines = size.columns, size.lines 371 except Exception: 372 _cols, _lines = ( 373 int(os.environ.get('COLUMNS', str(default_cols))), 374 int(os.environ.get('LINES', str(default_lines))), 375 ) 376 return _cols, _lines 377 378 379def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 380 """ 381 Iterate over a list in chunks. 382 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 383 384 Parameters 385 ---------- 386 iterable: Iterable[Any] 387 The iterable to iterate over in chunks. 388 389 chunksize: int 390 The size of chunks to iterate with. 391 392 fillvalue: Optional[Any], default None 393 If the chunks do not evenly divide into the iterable, pad the end with this value. 394 395 Returns 396 ------- 397 A generator of tuples of size `chunksize`. 398 399 """ 400 from itertools import zip_longest 401 args = [iter(iterable)] * chunksize 402 return zip_longest(*args, fillvalue=fillvalue) 403 404def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 405 """ 406 Sort a dictionary's values and return a new dictionary. 407 408 Parameters 409 ---------- 410 d: Dict[Any, Any] 411 The dictionary to be sorted. 412 413 Returns 414 ------- 415 A sorted dictionary. 416 417 Examples 418 -------- 419 >>> sorted_dict({'b': 1, 'a': 2}) 420 {'b': 1, 'a': 2} 421 >>> sorted_dict({'b': 2, 'a': 1}) 422 {'a': 1, 'b': 2} 423 424 """ 425 try: 426 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 427 except Exception: 428 return d 429 430 431def timed_input( 432 seconds: int = 10, 433 timeout_message: str = "", 434 prompt: str = "", 435 icon: bool = False, 436 **kw 437) -> Union[str, None]: 438 """ 439 Accept user input only for a brief period of time. 440 441 Parameters 442 ---------- 443 seconds: int, default 10 444 The number of seconds to wait. 445 446 timeout_message: str, default '' 447 The message to print after the window has elapsed. 448 449 prompt: str, default '' 450 The prompt to print during the window. 451 452 icon: bool, default False 453 If `True`, print the configured input icon. 454 455 456 Returns 457 ------- 458 The input string entered by the user. 459 460 """ 461 import signal, time 462 463 class TimeoutExpired(Exception): 464 """Raise this exception when the timeout is reached.""" 465 466 def alarm_handler(signum, frame): 467 raise TimeoutExpired 468 469 # set signal handler 470 signal.signal(signal.SIGALRM, alarm_handler) 471 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 472 473 try: 474 return input(prompt) 475 except TimeoutExpired: 476 return None 477 except (EOFError, RuntimeError): 478 try: 479 print(prompt) 480 time.sleep(seconds) 481 except TimeoutExpired: 482 return None 483 finally: 484 signal.alarm(0) # cancel alarm 485 486 487def enforce_gevent_monkey_patch(): 488 """ 489 Check if gevent monkey patching is enabled, and if not, then apply patching. 490 """ 491 from meerschaum.utils.packages import attempt_import 492 import socket 493 gevent, gevent_socket, gevent_monkey = attempt_import( 494 'gevent', 'gevent.socket', 'gevent.monkey' 495 ) 496 if not socket.socket is gevent_socket.socket: 497 gevent_monkey.patch_all() 498 499def is_valid_email(email: str) -> Union['re.Match', None]: 500 """ 501 Check whether a string is a valid email. 502 503 Parameters 504 ---------- 505 email: str 506 The string to be examined. 507 508 Returns 509 ------- 510 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 511 512 Examples 513 -------- 514 >>> is_valid_email('foo') 515 >>> is_valid_email('foo@foo.com') 516 <re.Match object; span=(0, 11), match='foo@foo.com'> 517 518 """ 519 import re 520 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 521 return re.search(regex, email) 522 523 524def string_width(string: str, widest: bool = True) -> int: 525 """ 526 Calculate the width of a string, either by its widest or last line. 527 528 Parameters 529 ---------- 530 string: str: 531 The string to be examined. 532 533 widest: bool, default True 534 No longer used because `widest` is always assumed to be true. 535 536 Returns 537 ------- 538 An integer for the text's visual width. 539 540 Examples 541 -------- 542 >>> string_width('a') 543 1 544 >>> string_width('a\\nbc\\nd') 545 2 546 547 """ 548 def _widest(): 549 words = string.split('\n') 550 max_length = 0 551 for w in words: 552 length = len(w) 553 if length > max_length: 554 max_length = length 555 return max_length 556 557 return _widest() 558 559def _pyinstaller_traverse_dir( 560 directory: str, 561 ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), 562 include_dotfiles: bool = False 563) -> list: 564 """ 565 Recursively traverse a directory and return a list of its contents. 566 """ 567 paths = [] 568 _directory = pathlib.Path(directory) 569 570 def _found_pattern(name: str): 571 for pattern in ignore_patterns: 572 if pattern.replace('/', os.path.sep) in str(name): 573 return True 574 return False 575 576 for root, dirs, files in os.walk(_directory): 577 _root = str(root)[len(str(_directory.parent)):] 578 if _root.startswith(os.path.sep): 579 _root = _root[len(os.path.sep):] 580 if _root.startswith('.') and not include_dotfiles: 581 continue 582 ### ignore certain patterns 583 if _found_pattern(_root): 584 continue 585 586 for filename in files: 587 if filename.startswith('.') and not include_dotfiles: 588 continue 589 path = os.path.join(root, filename) 590 if _found_pattern(path): 591 continue 592 593 _path = str(path)[len(str(_directory.parent)):] 594 if _path.startswith(os.path.sep): 595 _path = _path[len(os.path.sep):] 596 _path = os.path.sep.join(_path.split(os.path.sep)[:-1]) 597 598 paths.append((path, _path)) 599 return paths 600 601 602def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any: 603 """ 604 Get a value from a dictionary with a tuple of keys. 605 606 Parameters 607 ---------- 608 d: Dict[Any, Any] 609 The dictionary to search. 610 611 path: Tuple[Any, ...] 612 The path of keys to traverse. 613 614 Returns 615 ------- 616 The value from the end of the path. 617 """ 618 return functools.reduce(lambda di, key: di[key], path, d) 619 620 621def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None: 622 """ 623 Set a value in a dictionary with a tuple of keys. 624 625 Parameters 626 ---------- 627 d: Dict[Any, Any] 628 The dictionary to search. 629 630 path: Tuple[Any, ...] 631 The path of keys to traverse. 632 633 val: Any 634 The value to set at the end of the path. 635 """ 636 get_val_from_dict_path(d, path[:-1])[path[-1]] = val 637 638 639def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 640 """ 641 Recursively replace passwords in a dictionary. 642 643 Parameters 644 ---------- 645 d: Dict[str, Any] 646 The dictionary to search through. 647 648 replace_with: str, default '*' 649 The string to replace each character of the password with. 650 651 Returns 652 ------- 653 Another dictionary where values to the keys `'password'` 654 are replaced with `replace_with` (`'*'`). 655 656 Examples 657 -------- 658 >>> replace_password({'a': 1}) 659 {'a': 1} 660 >>> replace_password({'password': '123'}) 661 {'password': '***'} 662 >>> replace_password({'nested': {'password': '123'}}) 663 {'nested': {'password': '***'}} 664 >>> replace_password({'password': '123'}, replace_with='!') 665 {'password': '!!!'} 666 667 """ 668 import copy 669 _d = copy.deepcopy(d) 670 for k, v in d.items(): 671 if isinstance(v, dict): 672 _d[k] = replace_password(v) 673 elif 'password' in str(k).lower(): 674 _d[k] = ''.join([replace_with for char in str(v)]) 675 elif str(k).lower() == 'uri': 676 from meerschaum.connectors.sql import SQLConnector 677 try: 678 uri_params = SQLConnector.parse_uri(v) 679 except Exception: 680 uri_params = None 681 if not uri_params: 682 continue 683 if not 'username' in uri_params or not 'password' in uri_params: 684 continue 685 _d[k] = v.replace( 686 uri_params['username'] + ':' + uri_params['password'], 687 uri_params['username'] + ':' + ''.join( 688 [replace_with for char in str(uri_params['password'])] 689 ) 690 ) 691 return _d 692 693 694def filter_arguments( 695 func: Callable[[Any], Any], 696 *args: Any, 697 **kwargs: Any 698) -> Tuple[Tuple[Any], Dict[str, Any]]: 699 """ 700 Filter out unsupported positional and keyword arguments. 701 702 Parameters 703 ---------- 704 func: Callable[[Any], Any] 705 The function to inspect. 706 707 *args: Any 708 Positional arguments to filter and pass to `func`. 709 710 **kwargs 711 Keyword arguments to filter and pass to `func`. 712 713 Returns 714 ------- 715 The `args` and `kwargs` accepted by `func`. 716 """ 717 args = filter_positionals(func, *args) 718 kwargs = filter_keywords(func, **kwargs) 719 return args, kwargs 720 721 722def filter_keywords( 723 func: Callable[[Any], Any], 724 **kw: Any 725) -> Dict[str, Any]: 726 """ 727 Filter out unsupported keyword arguments. 728 729 Parameters 730 ---------- 731 func: Callable[[Any], Any] 732 The function to inspect. 733 734 **kw: Any 735 The arguments to be filtered and passed into `func`. 736 737 Returns 738 ------- 739 A dictionary of keyword arguments accepted by `func`. 740 741 Examples 742 -------- 743 ```python 744 >>> def foo(a=1, b=2): 745 ... return a * b 746 >>> filter_keywords(foo, a=2, b=4, c=6) 747 {'a': 2, 'b': 4} 748 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 749 8 750 ``` 751 752 """ 753 import inspect 754 func_params = inspect.signature(func).parameters 755 ### If the function has a **kw method, skip filtering. 756 for param, _type in func_params.items(): 757 if '**' in str(_type): 758 return kw 759 return {k: v for k, v in kw.items() if k in func_params} 760 761 762def filter_positionals( 763 func: Callable[[Any], Any], 764 *args: Any 765) -> Tuple[Any]: 766 """ 767 Filter out unsupported positional arguments. 768 769 Parameters 770 ---------- 771 func: Callable[[Any], Any] 772 The function to inspect. 773 774 *args: Any 775 The arguments to be filtered and passed into `func`. 776 NOTE: If the function signature expects more arguments than provided, 777 the missing slots will be filled with `None`. 778 779 Returns 780 ------- 781 A tuple of positional arguments accepted by `func`. 782 783 Examples 784 -------- 785 ```python 786 >>> def foo(a, b): 787 ... return a * b 788 >>> filter_positionals(foo, 2, 4, 6) 789 (2, 4) 790 >>> foo(*filter_positionals(foo, 2, 4, 6)) 791 8 792 ``` 793 794 """ 795 import inspect 796 from meerschaum.utils.warnings import warn 797 func_params = inspect.signature(func).parameters 798 acceptable_args: List[Any] = [] 799 800 def _warn_invalids(_num_invalid): 801 if _num_invalid > 0: 802 warn( 803 "Too few arguments were provided. " 804 + f"{_num_invalid} argument" 805 + ('s have ' if _num_invalid != 1 else " has ") 806 + " been filled with `None`.", 807 ) 808 809 num_invalid: int = 0 810 for i, (param, val) in enumerate(func_params.items()): 811 if '=' in str(val) or '*' in str(val): 812 _warn_invalids(num_invalid) 813 return tuple(acceptable_args) 814 815 try: 816 acceptable_args.append(args[i]) 817 except IndexError: 818 acceptable_args.append(None) 819 num_invalid += 1 820 821 _warn_invalids(num_invalid) 822 return tuple(acceptable_args) 823 824 825def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 826 """ 827 Convert an ordered dict to a dict. 828 Does not mutate the original OrderedDict. 829 """ 830 from collections import OrderedDict 831 _d = dict(od) 832 for k, v in od.items(): 833 if isinstance(v, OrderedDict) or ( 834 issubclass(type(v), OrderedDict) 835 ): 836 _d[k] = dict_from_od(v) 837 return _d 838 839 840def remove_ansi(s: str) -> str: 841 """ 842 Remove ANSI escape characters from a string. 843 844 Parameters 845 ---------- 846 s: str: 847 The string to be cleaned. 848 849 Returns 850 ------- 851 A string with the ANSI characters removed. 852 853 Examples 854 -------- 855 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 856 'Hello, World!' 857 858 """ 859 import re 860 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s) 861 862 863def get_connector_labels( 864 *types: str, 865 search_term: str = '', 866 ignore_exact_match = True, 867 _additional_options: Optional[List[str]] = None, 868) -> List[str]: 869 """ 870 Read connector labels from the configuration dictionary. 871 872 Parameters 873 ---------- 874 *types: str 875 The connector types. 876 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 877 878 search_term: str, default '' 879 A filter on the connectors' labels. 880 881 ignore_exact_match: bool, default True 882 If `True`, skip a connector if the search_term is an exact match. 883 884 Returns 885 ------- 886 A list of the keys of defined connectors. 887 888 """ 889 from meerschaum.config import get_config 890 connectors = get_config('meerschaum', 'connectors') 891 892 _types = list(types) 893 if len(_types) == 0: 894 _types = list(connectors.keys()) + ['plugin'] 895 896 conns = [] 897 for t in _types: 898 if t == 'plugin': 899 from meerschaum.plugins import get_data_plugins 900 conns += [ 901 f'{t}:' + plugin.module.__name__.split('.')[-1] 902 for plugin in get_data_plugins() 903 ] 904 continue 905 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 906 907 if _additional_options: 908 conns += _additional_options 909 910 possibilities = [ 911 c 912 for c in conns 913 if c.startswith(search_term) 914 and c != ( 915 search_term if ignore_exact_match else '' 916 ) 917 ] 918 return sorted(possibilities) 919 920 921def wget( 922 url: str, 923 dest: Optional[Union[str, 'pathlib.Path']] = None, 924 headers: Optional[Dict[str, Any]] = None, 925 color: bool = True, 926 debug: bool = False, 927 **kw: Any 928) -> 'pathlib.Path': 929 """ 930 Mimic `wget` with `requests`. 931 932 Parameters 933 ---------- 934 url: str 935 The URL to the resource to be downloaded. 936 937 dest: Optional[Union[str, pathlib.Path]], default None 938 The destination path of the downloaded file. 939 If `None`, save to the current directory. 940 941 color: bool, default True 942 If `debug` is `True`, print color output. 943 944 debug: bool, default False 945 Verbosity toggle. 946 947 Returns 948 ------- 949 The path to the downloaded file. 950 951 """ 952 from meerschaum.utils.warnings import warn, error 953 from meerschaum.utils.debug import dprint 954 import re, urllib.request 955 if headers is None: 956 headers = {} 957 request = urllib.request.Request(url, headers=headers) 958 if not color: 959 dprint = print 960 if debug: 961 dprint(f"Downloading from '{url}'...") 962 try: 963 response = urllib.request.urlopen(request) 964 except Exception as e: 965 import ssl 966 ssl._create_default_https_context = ssl._create_unverified_context 967 try: 968 response = urllib.request.urlopen(request) 969 except Exception as _e: 970 print(_e) 971 response = None 972 if response is None or response.code != 200: 973 error_msg = f"Failed to download from '{url}'." 974 if color: 975 error(error_msg) 976 else: 977 print(error_msg) 978 import sys 979 sys.exit(1) 980 981 d = response.headers.get('content-disposition', None) 982 fname = ( 983 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 984 else url.split('/')[-1] 985 ) 986 987 if dest is None: 988 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 989 elif isinstance(dest, str): 990 dest = pathlib.Path(dest) 991 992 with open(dest, 'wb') as f: 993 f.write(response.fp.read()) 994 995 if debug: 996 dprint(f"Downloaded file '{dest}'.") 997 998 return dest 999 1000 1001def async_wrap(func): 1002 """ 1003 Run a synchronous function as async. 1004 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1005 """ 1006 import asyncio 1007 from functools import wraps, partial 1008 1009 @wraps(func) 1010 async def run(*args, loop=None, executor=None, **kwargs): 1011 if loop is None: 1012 loop = asyncio.get_event_loop() 1013 pfunc = partial(func, *args, **kwargs) 1014 return await loop.run_in_executor(executor, pfunc) 1015 return run 1016 1017 1018def debug_trace(browser: bool = True): 1019 """ 1020 Open a web-based debugger to trace the execution of the program. 1021 1022 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1023 """ 1024 from meerschaum.utils.debug import trace 1025 trace(browser=browser) 1026 1027 1028def items_str( 1029 items: List[Any], 1030 quotes: bool = True, 1031 quote_str: str = "'", 1032 commas: bool = True, 1033 comma_str: str = ',', 1034 and_: bool = True, 1035 and_str: str = 'and', 1036 oxford_comma: bool = True, 1037 spaces: bool = True, 1038 space_str = ' ', 1039) -> str: 1040 """ 1041 Return a formatted string if list items separated by commas. 1042 1043 Parameters 1044 ---------- 1045 items: [List[Any]] 1046 The items to be printed as an English list. 1047 1048 quotes: bool, default True 1049 If `True`, wrap items in quotes. 1050 1051 quote_str: str, default "'" 1052 If `quotes` is `True`, prepend and append each item with this string. 1053 1054 and_: bool, default True 1055 If `True`, include the word 'and' before the final item in the list. 1056 1057 and_str: str, default 'and' 1058 If `and_` is True, insert this string where 'and' normally would in and English list. 1059 1060 oxford_comma: bool, default True 1061 If `True`, include the Oxford Comma (comma before the final 'and'). 1062 Only applies when `and_` is `True`. 1063 1064 spaces: bool, default True 1065 If `True`, separate items with `space_str` 1066 1067 space_str: str, default ' ' 1068 If `spaces` is `True`, separate items with this string. 1069 1070 Returns 1071 ------- 1072 A string of the items as an English list. 1073 1074 Examples 1075 -------- 1076 >>> items_str([1,2,3]) 1077 "'1', '2', and '3'" 1078 >>> items_str([1,2,3], quotes=False) 1079 '1, 2, and 3' 1080 >>> items_str([1,2,3], and_=False) 1081 "'1', '2', '3'" 1082 >>> items_str([1,2,3], spaces=False, and_=False) 1083 "'1','2','3'" 1084 >>> items_str([1,2,3], oxford_comma=False) 1085 "'1', '2' and '3'" 1086 >>> items_str([1,2,3], quote_str=":") 1087 ':1:, :2:, and :3:' 1088 >>> items_str([1,2,3], and_str="or") 1089 "'1', '2', or '3'" 1090 >>> items_str([1,2,3], space_str="_") 1091 "'1',_'2',_and_'3'" 1092 1093 """ 1094 if not items: 1095 return '' 1096 1097 q = quote_str if quotes else '' 1098 s = space_str if spaces else '' 1099 a = and_str if and_ else '' 1100 c = comma_str if commas else '' 1101 1102 if len(items) == 1: 1103 return q + str(list(items)[0]) + q 1104 1105 if len(items) == 2: 1106 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1107 1108 sep = q + c + s + q 1109 output = q + sep.join(str(i) for i in items[:-1]) + q 1110 if oxford_comma: 1111 output += c 1112 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1113 return output 1114 1115 1116def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str: 1117 """ 1118 Return a human-readable string for a `timedelta` (or `int` minutes). 1119 1120 Parameters 1121 ---------- 1122 delta: Union[timedelta, int] 1123 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1124 1125 round_unit: bool, default False 1126 If `True`, round the output to a single unit. 1127 1128 Returns 1129 ------- 1130 A formatted string, fit for human eyes. 1131 """ 1132 from meerschaum.utils.packages import attempt_import 1133 if is_int(str(delta)) and not round_unit: 1134 return str(delta) 1135 1136 humanfriendly = attempt_import('humanfriendly', lazy=False) 1137 delta_seconds = ( 1138 delta.total_seconds() 1139 if hasattr(delta, 'total_seconds') 1140 else (delta * 60) 1141 ) 1142 1143 is_negative = delta_seconds < 0 1144 delta_seconds = abs(delta_seconds) 1145 replace_units = {} 1146 1147 if round_unit: 1148 if delta_seconds < 1: 1149 delta_seconds = round(delta_seconds, 2) 1150 elif delta_seconds < 60: 1151 delta_seconds = int(delta_seconds) 1152 elif delta_seconds < 3600: 1153 delta_seconds = int(delta_seconds / 60) * 60 1154 elif delta_seconds < 86400: 1155 delta_seconds = int(delta_seconds / 3600) * 3600 1156 elif delta_seconds < (86400 * 7): 1157 delta_seconds = int(delta_seconds / 86400) * 86400 1158 elif delta_seconds < (86400 * 7 * 4): 1159 delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7) 1160 elif delta_seconds < (86400 * 7 * 4 * 13): 1161 delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7) 1162 replace_units['weeks'] = 'months' 1163 else: 1164 delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364) 1165 1166 delta_str = humanfriendly.format_timespan(delta_seconds) 1167 if ',' in delta_str and round_unit: 1168 delta_str = delta_str.split(',')[0] 1169 elif ' and ' in delta_str and round_unit: 1170 delta_str = delta_str.split(' and ')[0] 1171 1172 for parsed_unit, replacement_unit in replace_units.items(): 1173 delta_str = delta_str.replace(parsed_unit, replacement_unit) 1174 1175 return delta_str + (' ago' if is_negative else '') 1176 1177 1178def is_docker_available() -> bool: 1179 """Check if we can connect to the Docker engine.""" 1180 import subprocess 1181 try: 1182 has_docker = subprocess.call( 1183 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1184 ) == 0 1185 except Exception: 1186 has_docker = False 1187 return has_docker 1188 1189 1190def is_android() -> bool: 1191 """Return `True` if the current platform is Android.""" 1192 import sys 1193 return hasattr(sys, 'getandroidapilevel') 1194 1195 1196def is_bcp_available() -> bool: 1197 """Check if the MSSQL `bcp` utility is installed.""" 1198 import subprocess 1199 1200 try: 1201 has_bcp = subprocess.call( 1202 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1203 ) == 0 1204 except Exception: 1205 has_bcp = False 1206 return has_bcp 1207 1208 1209def is_systemd_available() -> bool: 1210 """Check if running on systemd.""" 1211 return os.path.isdir('/run/systemd/system') 1212 1213 1214def is_tmux_available() -> bool: 1215 """ 1216 Check if `tmux` is installed. 1217 """ 1218 import subprocess 1219 try: 1220 has_tmux = subprocess.call( 1221 ['tmux', '-V'], 1222 stdout=subprocess.DEVNULL, 1223 stderr=subprocess.STDOUT 1224 ) == 0 1225 except FileNotFoundError: 1226 has_tmux = False 1227 except Exception: 1228 has_tmux = False 1229 return has_tmux 1230 1231def get_last_n_lines(file_name: str, N: int): 1232 """ 1233 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1234 """ 1235 # Create an empty list to keep the track of last N lines 1236 list_of_lines = [] 1237 # Open file for reading in binary mode 1238 with open(file_name, 'rb') as read_obj: 1239 # Move the cursor to the end of the file 1240 read_obj.seek(0, os.SEEK_END) 1241 # Create a buffer to keep the last read line 1242 buffer = bytearray() 1243 # Get the current position of pointer i.e eof 1244 pointer_location = read_obj.tell() 1245 # Loop till pointer reaches the top of the file 1246 while pointer_location >= 0: 1247 # Move the file pointer to the location pointed by pointer_location 1248 read_obj.seek(pointer_location) 1249 # Shift pointer location by -1 1250 pointer_location = pointer_location -1 1251 # read that byte / character 1252 new_byte = read_obj.read(1) 1253 # If the read byte is new line character then it means one line is read 1254 if new_byte == b'\n': 1255 # Save the line in list of lines 1256 list_of_lines.append(buffer.decode()[::-1]) 1257 # If the size of list reaches N, then return the reversed list 1258 if len(list_of_lines) == N: 1259 return list(reversed(list_of_lines)) 1260 # Reinitialize the byte array to save next line 1261 buffer = bytearray() 1262 else: 1263 # If last read character is not eol then add it in buffer 1264 buffer.extend(new_byte) 1265 # As file is read completely, if there is still data in buffer, then its first line. 1266 if len(buffer) > 0: 1267 list_of_lines.append(buffer.decode()[::-1]) 1268 # return the reversed list 1269 return list(reversed(list_of_lines)) 1270 1271 1272def tail(f, n, offset=None): 1273 """ 1274 https://stackoverflow.com/a/692616/9699829 1275 1276 Reads n lines from f with an offset of offset lines. The return 1277 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1278 an indicator that is `True` if there are more lines in the file. 1279 """ 1280 avg_line_length = 74 1281 to_read = n + (offset or 0) 1282 1283 while True: 1284 try: 1285 f.seek(-(avg_line_length * to_read), 2) 1286 except IOError: 1287 # woops. apparently file is smaller than what we want 1288 # to step back, go to the beginning instead 1289 f.seek(0) 1290 pos = f.tell() 1291 lines = f.read().splitlines() 1292 if len(lines) >= to_read or pos == 0: 1293 return lines[-to_read:offset and -offset or None], \ 1294 len(lines) > to_read or pos > 0 1295 avg_line_length *= 1.3 1296 1297 1298def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1299 """ 1300 Remove characters from each section of a string until the length is within the limit. 1301 1302 Parameters 1303 ---------- 1304 item: str 1305 The item name to be truncated. 1306 1307 delimeter: str, default '_' 1308 Split `item` by this string into several sections. 1309 1310 max_len: int, default 128 1311 The max acceptable length of the truncated version of `item`. 1312 1313 Returns 1314 ------- 1315 The truncated string. 1316 1317 Examples 1318 -------- 1319 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1320 'ab_de_gh' 1321 1322 """ 1323 if len(item) < max_len: 1324 return item 1325 1326 def _shorten(s: str) -> str: 1327 return s[:-1] if len(s) > 1 else s 1328 1329 sections = list(enumerate(item.split(delimeter))) 1330 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1331 available_chars = max_len - len(sections) 1332 1333 _sections = [(i, s) for i, s in sorted_sections] 1334 _sections_len = sum([len(s) for i, s in _sections]) 1335 _old_sections_len = _sections_len 1336 while _sections_len > available_chars: 1337 _sections = [(i, _shorten(s)) for i, s in _sections] 1338 _old_sections_len = _sections_len 1339 _sections_len = sum([len(s) for i, s in _sections]) 1340 if _old_sections_len == _sections_len: 1341 raise Exception(f"String could not be truncated: '{item}'") 1342 1343 new_sections = sorted(_sections, key=lambda x: x[0]) 1344 return delimeter.join([s for i, s in new_sections]) 1345 1346 1347def truncate_text_for_display( 1348 text: str, 1349 max_length: int = 50, 1350 suffix: str = '…', 1351) -> str: 1352 """ 1353 Truncate a potentially long string for display purposes. 1354 1355 Parameters 1356 ---------- 1357 text: str 1358 The string to be truncated. 1359 1360 max_length: int, default 60 1361 The maximum length of `text` before truncation. 1362 1363 suffix: str, default '…' 1364 The string to append to the length of `text` to indicate truncation. 1365 1366 Returns 1367 ------- 1368 A string of length `max_length` or less. 1369 """ 1370 text_length = len(text) 1371 if text_length <= max_length: 1372 return text 1373 1374 suffix_length = len(suffix) 1375 1376 truncated_text = text[:max_length - suffix_length] 1377 return truncated_text + suffix 1378 1379 1380def separate_negation_values( 1381 vals: Union[List[str], Tuple[str]], 1382 negation_prefix: Optional[str] = None, 1383) -> Tuple[List[str], List[str]]: 1384 """ 1385 Separate the negated values from the positive ones. 1386 Return two lists: positive and negative values. 1387 1388 Parameters 1389 ---------- 1390 vals: Union[List[str], Tuple[str]] 1391 A list of strings to parse. 1392 1393 negation_prefix: Optional[str], default None 1394 Include values that start with this string in the second list. 1395 If `None`, use the system default (`_`). 1396 """ 1397 if negation_prefix is None: 1398 from meerschaum._internal.static import STATIC_CONFIG 1399 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1400 _in_vals, _ex_vals = [], [] 1401 for v in vals: 1402 if str(v).startswith(negation_prefix): 1403 _ex_vals.append(str(v)[len(negation_prefix):]) 1404 else: 1405 _in_vals.append(v) 1406 1407 return _in_vals, _ex_vals 1408 1409 1410def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1411 """ 1412 Translate a params dictionary into lists of include- and exclude-values. 1413 1414 Parameters 1415 ---------- 1416 params: Optional[Dict[str, Any]] 1417 A params query dictionary. 1418 1419 Returns 1420 ------- 1421 A dictionary mapping keys to a tuple of lists for include and exclude values. 1422 1423 Examples 1424 -------- 1425 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1426 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1427 """ 1428 if not params: 1429 return {} 1430 return { 1431 col: separate_negation_values( 1432 ( 1433 val 1434 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype') 1435 else [val] 1436 ) 1437 ) 1438 for col, val in params.items() 1439 } 1440 1441 1442def flatten_list(list_: List[Any]) -> List[Any]: 1443 """ 1444 Recursively flatten a list. 1445 """ 1446 for item in list_: 1447 if isinstance(item, list): 1448 yield from flatten_list(item) 1449 else: 1450 yield item 1451 1452 1453def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1454 """ 1455 Parse a string containing the text to be passed into a function 1456 and return a tuple of args, kwargs. 1457 1458 Parameters 1459 ---------- 1460 args_str: str 1461 The contents of the function parameter (as a string). 1462 1463 Returns 1464 ------- 1465 A tuple of args (tuple) and kwargs (dict[str, Any]). 1466 1467 Examples 1468 -------- 1469 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1470 (123, 456), {'foo': 789, 'bar': 'baz'} 1471 """ 1472 import ast 1473 args = [] 1474 kwargs = {} 1475 1476 for part in args_str.split(','): 1477 if '=' in part: 1478 key, val = part.split('=', 1) 1479 kwargs[key.strip()] = ast.literal_eval(val) 1480 else: 1481 args.append(ast.literal_eval(part.strip())) 1482 1483 return tuple(args), kwargs 1484 1485 1486def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1487 """ 1488 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1489 1490 Parameters 1491 ---------- 1492 src_path: pathlib.Path 1493 The source path. 1494 1495 dest_path: pathlib.Path 1496 The destination path. 1497 1498 Returns 1499 ------- 1500 A SuccessTuple indicating success. 1501 """ 1502 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1503 return True, "Symlink already exists." 1504 try: 1505 dest_path.symlink_to(src_path) 1506 success = True 1507 except Exception as e: 1508 success = False 1509 msg = str(e) 1510 if success: 1511 return success, "Success" 1512 1513 ### Failed to create a symlink. 1514 ### If we're not on Windows, return an error. 1515 import platform 1516 if platform.system() != 'Windows': 1517 return success, msg 1518 1519 try: 1520 import _winapi 1521 except ImportError: 1522 return False, "Unable to import _winapi." 1523 1524 if src_path.is_dir(): 1525 try: 1526 _winapi.CreateJunction(str(src_path), str(dest_path)) 1527 except Exception as e: 1528 return False, str(e) 1529 return True, "Success" 1530 1531 ### Last resort: copy the file on Windows. 1532 import shutil 1533 try: 1534 shutil.copy(src_path, dest_path) 1535 except Exception as e: 1536 return False, str(e) 1537 1538 return True, "Success" 1539 1540 1541def is_symlink(path: pathlib.Path) -> bool: 1542 """ 1543 Wrap `path.is_symlink()` but add support for Windows junctions. 1544 """ 1545 if path.is_symlink(): 1546 return True 1547 1548 import platform 1549 if platform.system() != 'Windows': 1550 return False 1551 try: 1552 return bool(os.readlink(path)) 1553 except OSError: 1554 return False 1555 1556 1557def parametrized(dec): 1558 """ 1559 A meta-decorator for allowing other decorator functions to have parameters. 1560 1561 https://stackoverflow.com/a/26151604/9699829 1562 """ 1563 def layer(*args, **kwargs): 1564 def repl(f): 1565 return dec(f, *args, **kwargs) 1566 return repl 1567 return layer 1568 1569 1570def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1571 """ 1572 Safely extract a TAR file to a give directory. 1573 This defends against CVE-2007-4559. 1574 1575 Parameters 1576 ---------- 1577 tarf: file 1578 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1579 1580 output_dir: Union[str, pathlib.Path] 1581 The output directory. 1582 """ 1583 1584 def is_within_directory(directory, target): 1585 abs_directory = os.path.abspath(directory) 1586 abs_target = os.path.abspath(target) 1587 prefix = os.path.commonprefix([abs_directory, abs_target]) 1588 return prefix == abs_directory 1589 1590 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1591 for member in tar.getmembers(): 1592 member_path = os.path.join(path, member.name) 1593 if not is_within_directory(path, member_path): 1594 raise Exception("Attempted Path Traversal in Tar File") 1595 1596 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1597 1598 return safe_extract(tarf, output_dir) 1599 1600 1601def to_snake_case(name: str) -> str: 1602 """ 1603 Return the given string in snake-case-style. 1604 1605 Parameters 1606 ---------- 1607 name: str 1608 The input text to convert to snake case. 1609 1610 Returns 1611 ------- 1612 A snake-case version of `name`. 1613 1614 Examples 1615 -------- 1616 >>> to_snake_case("HelloWorld!") 1617 'hello_world' 1618 >>> to_snake_case("This has spaces in it.") 1619 'this_has_spaces_in_it' 1620 >>> to_snake_case("already_in_snake_case") 1621 'already_in_snake_case' 1622 """ 1623 import re 1624 name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name) 1625 name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name) 1626 name = re.sub(r'[^\w\s]', '', name) 1627 name = re.sub(r'\s+', '_', name) 1628 return name.lower() 1629 1630 1631def get_directory_size(path: Path) -> int: 1632 """ 1633 Return the cumulative size of a directory's files in bytes. 1634 https://stackoverflow.com/a/55659577/9699829 1635 """ 1636 return sum(file.stat().st_size for file in path.rglob('*')) 1637 1638 1639################## 1640# Legacy imports # 1641################## 1642 1643def choose_subaction(*args, **kwargs) -> Any: 1644 """ 1645 Placeholder function to prevent breaking legacy behavior. 1646 See `meerschaum.actions.choose_subaction`. 1647 """ 1648 from meerschaum.actions import choose_subaction as _choose_subactions 1649 return _choose_subactions(*args, **kwargs) 1650 1651 1652def print_options(*args, **kwargs) -> None: 1653 """ 1654 Placeholder function to prevent breaking legacy behavior. 1655 See `meerschaum.utils.formatting.print_options`. 1656 """ 1657 from meerschaum.utils.formatting import print_options as _print_options 1658 return _print_options(*args, **kwargs) 1659 1660 1661def to_pandas_dtype(*args, **kwargs) -> Any: 1662 """ 1663 Placeholder function to prevent breaking legacy behavior. 1664 See `meerschaum.utils.dtypes.to_pandas_dtype`. 1665 """ 1666 from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype 1667 return _to_pandas_dtype(*args, **kwargs) 1668 1669 1670def filter_unseen_df(*args, **kwargs) -> Any: 1671 """ 1672 Placeholder function to prevent breaking legacy behavior. 1673 See `meerschaum.utils.dataframe.filter_unseen_df`. 1674 """ 1675 from meerschaum.utils.dataframe import filter_unseen_df as real_function 1676 return real_function(*args, **kwargs) 1677 1678 1679def add_missing_cols_to_df(*args, **kwargs) -> Any: 1680 """ 1681 Placeholder function to prevent breaking legacy behavior. 1682 See `meerschaum.utils.dataframe.add_missing_cols_to_df`. 1683 """ 1684 from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function 1685 return real_function(*args, **kwargs) 1686 1687 1688def parse_df_datetimes(*args, **kwargs) -> Any: 1689 """ 1690 Placeholder function to prevent breaking legacy behavior. 1691 See `meerschaum.utils.dataframe.parse_df_datetimes`. 1692 """ 1693 from meerschaum.utils.dataframe import parse_df_datetimes as real_function 1694 return real_function(*args, **kwargs) 1695 1696 1697def df_from_literal(*args, **kwargs) -> Any: 1698 """ 1699 Placeholder function to prevent breaking legacy behavior. 1700 See `meerschaum.utils.dataframe.df_from_literal`. 1701 """ 1702 from meerschaum.utils.dataframe import df_from_literal as real_function 1703 return real_function(*args, **kwargs) 1704 1705 1706def get_json_cols(*args, **kwargs) -> Any: 1707 """ 1708 Placeholder function to prevent breaking legacy behavior. 1709 See `meerschaum.utils.dataframe.get_json_cols`. 1710 """ 1711 from meerschaum.utils.dataframe import get_json_cols as real_function 1712 return real_function(*args, **kwargs) 1713 1714 1715def get_unhashable_cols(*args, **kwargs) -> Any: 1716 """ 1717 Placeholder function to prevent breaking legacy behavior. 1718 See `meerschaum.utils.dataframe.get_unhashable_cols`. 1719 """ 1720 from meerschaum.utils.dataframe import get_unhashable_cols as real_function 1721 return real_function(*args, **kwargs) 1722 1723 1724def enforce_dtypes(*args, **kwargs) -> Any: 1725 """ 1726 Placeholder function to prevent breaking legacy behavior. 1727 See `meerschaum.utils.dataframe.enforce_dtypes`. 1728 """ 1729 from meerschaum.utils.dataframe import enforce_dtypes as real_function 1730 return real_function(*args, **kwargs) 1731 1732 1733def get_datetime_bound_from_df(*args, **kwargs) -> Any: 1734 """ 1735 Placeholder function to prevent breaking legacy behavior. 1736 See `meerschaum.utils.dataframe.get_datetime_bound_from_df`. 1737 """ 1738 from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function 1739 return real_function(*args, **kwargs) 1740 1741 1742def df_is_chunk_generator(*args, **kwargs) -> Any: 1743 """ 1744 Placeholder function to prevent breaking legacy behavior. 1745 See `meerschaum.utils.dataframe.df_is_chunk_generator`. 1746 """ 1747 from meerschaum.utils.dataframe import df_is_chunk_generator as real_function 1748 return real_function(*args, **kwargs) 1749 1750 1751def choices_docstring(*args, **kwargs) -> Any: 1752 """ 1753 Placeholder function to prevent breaking legacy behavior. 1754 See `meerschaum.actions.choices_docstring`. 1755 """ 1756 from meerschaum.actions import choices_docstring as real_function 1757 return real_function(*args, **kwargs) 1758 1759 1760def _get_subaction_names(*args, **kwargs) -> Any: 1761 """ 1762 Placeholder function to prevent breaking legacy behavior. 1763 See `meerschaum.actions._get_subaction_names`. 1764 """ 1765 from meerschaum.actions import _get_subaction_names as real_function 1766 return real_function(*args, **kwargs) 1767 1768 1769def json_serialize_datetime(dt: datetime) -> Union[str, None]: 1770 """ 1771 Serialize a datetime object into JSON (ISO format string). 1772 1773 Examples 1774 -------- 1775 >>> import json 1776 >>> from datetime import datetime 1777 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 1778 '{"a": "2022-01-01T00:00:00Z"}' 1779 1780 """ 1781 from meerschaum.utils.dtypes import serialize_datetime 1782 return serialize_datetime(dt) 1783 1784 1785def replace_pipes_in_dict(*args, **kwargs): 1786 """ 1787 Placeholder function to prevent breaking legacy behavior. 1788 See `meerschaum.utils.pipes.replace_pipes_in_dict`. 1789 """ 1790 from meerschaum.utils.pipes import replace_pipes_in_dict 1791 return replace_pipes_in_dict(*args, **kwargs) 1792 1793 1794def is_pipe_registered(*args, **kwargs): 1795 """ 1796 Placeholder function to prevent breaking legacy behavior. 1797 See `meerschaum.utils.pipes.is_pipe_registered`. 1798 """ 1799 from meerschaum.utils.pipes import is_pipe_registered 1800 return is_pipe_registered(*args, **kwargs) 1801 1802 1803def round_time(*args, **kwargs): 1804 """ 1805 Placeholder function to prevent breaking legacy behavior. 1806 See `meerschaum.utils.dtypes.round_time`. 1807 """ 1808 from meerschaum.utils.dtypes import round_time 1809 return round_time(*args, **kwargs) 1810 1811 1812def flatten_pipes_dict(*args, **kwargs): 1813 """ 1814 Placeholder function to prevent breaking legacy behavior. 1815 See `meerschaum.utils.pipes.flatten_pipes_dict`. 1816 """ 1817 from meerschaum.utils.pipes import flatten_pipes_dict 1818 return flatten_pipes_dict(*args, **kwargs) 1819 1820 1821_current_module = sys.modules[__name__] 1822__all__ = tuple( 1823 name 1824 for name, obj in globals().items() 1825 if callable(obj) 1826 and name not in __pdoc__ 1827 and getattr(obj, '__module__', None) == _current_module.__name__ 1828 and not name.startswith('_') 1829)
52def add_method_to_class( 53 func: Callable[[Any], Any], 54 class_def: 'Class', 55 method_name: Optional[str] = None, 56 keep_self: Optional[bool] = None, 57) -> Callable[[Any], Any]: 58 """ 59 Add function `func` to class `class_def`. 60 61 Parameters 62 ---------- 63 func: Callable[[Any], Any] 64 Function to be added as a method of the class 65 66 class_def: Class 67 Class to be modified. 68 69 method_name: Optional[str], default None 70 New name of the method. None will use func.__name__ (default). 71 72 Returns 73 ------- 74 The modified function object. 75 76 """ 77 from functools import wraps 78 79 is_class = isinstance(class_def, type) 80 81 @wraps(func) 82 def wrapper(self, *args, **kw): 83 return func(*args, **kw) 84 85 if method_name is None: 86 method_name = func.__name__ 87 88 setattr(class_def, method_name, ( 89 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 90 ) 91 ) 92 93 return func
Add function func to class class_def.
Parameters
- func (Callable[[Any], Any]): Function to be added as a method of the class
- class_def (Class): Class to be modified.
- method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
- The modified function object.
96def generate_password(length: int = 12) -> str: 97 """ 98 Generate a secure password of given length. 99 100 Parameters 101 ---------- 102 length: int, default 12 103 The length of the password. 104 105 Returns 106 ------- 107 A random password string. 108 """ 109 import secrets 110 import string 111 return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))
Generate a secure password of given length.
Parameters
- length (int, default 12): The length of the password.
Returns
- A random password string.
114def is_int(s: str) -> bool: 115 """ 116 Check if string is an int. 117 118 Parameters 119 ---------- 120 s: str 121 The string to be checked. 122 123 Returns 124 ------- 125 A bool indicating whether the string was able to be cast to an integer. 126 127 """ 128 try: 129 return float(s).is_integer() 130 except Exception: 131 return False
Check if string is an int.
Parameters
- s (str): The string to be checked.
Returns
- A bool indicating whether the string was able to be cast to an integer.
134def is_uuid(s: str) -> bool: 135 """ 136 Check if a string is a valid UUID. 137 138 Parameters 139 ---------- 140 s: str 141 The string to be checked. 142 143 Returns 144 ------- 145 A bool indicating whether the string is a valid UUID. 146 """ 147 import uuid 148 try: 149 uuid.UUID(str(s)) 150 return True 151 except Exception: 152 return False
Check if a string is a valid UUID.
Parameters
- s (str): The string to be checked.
Returns
- A bool indicating whether the string is a valid UUID.
155def string_to_dict(params_string: str) -> Dict[str, Any]: 156 """ 157 Parse a string into a dictionary. 158 159 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 160 161 Parameters 162 ---------- 163 params_string: str 164 The string to be parsed. 165 166 Returns 167 ------- 168 The parsed dictionary. 169 170 Examples 171 -------- 172 >>> string_to_dict("a:1,b:2") 173 {'a': 1, 'b': 2} 174 >>> string_to_dict('{"a": 1, "b": 2}') 175 {'a': 1, 'b': 2} 176 177 """ 178 if not params_string: 179 return {} 180 181 import json 182 183 ### Kind of a weird edge case. 184 ### In the generated compose file, there is some weird escaping happening, 185 ### so the string to be parsed starts and ends with a single quote. 186 if ( 187 isinstance(params_string, str) 188 and len(params_string) > 4 189 and params_string[1] == "{" 190 and params_string[-2] == "}" 191 ): 192 return json.loads(params_string[1:-1]) 193 194 if str(params_string).startswith('{'): 195 return json.loads(params_string) 196 197 import ast 198 params_dict = {} 199 200 items = [] 201 bracket_level = 0 202 brace_level = 0 203 current_item = '' 204 in_quotes = False 205 quote_char = '' 206 207 i = 0 208 while i < len(params_string): 209 char = params_string[i] 210 211 if in_quotes: 212 if char == quote_char and (i == 0 or params_string[i-1] != '\\'): 213 in_quotes = False 214 else: 215 if char in ('"', "'"): 216 in_quotes = True 217 quote_char = char 218 elif char == '[': 219 bracket_level += 1 220 elif char == ']': 221 bracket_level -= 1 222 elif char == '{': 223 brace_level += 1 224 elif char == '}': 225 brace_level -= 1 226 elif char == ',' and bracket_level == 0 and brace_level == 0: 227 items.append(current_item) 228 current_item = '' 229 i += 1 230 continue 231 232 current_item += char 233 i += 1 234 235 if current_item: 236 items.append(current_item) 237 238 for param in items: 239 param = param.strip() 240 if not param: 241 continue 242 243 _keys = param.split(":", maxsplit=1) 244 if len(_keys) != 2: 245 continue 246 247 keys = _keys[:-1] 248 try: 249 val = ast.literal_eval(_keys[-1]) 250 except Exception: 251 val = str(_keys[-1]) 252 253 c = params_dict 254 for _k in keys[:-1]: 255 try: 256 k = ast.literal_eval(_k) 257 except Exception: 258 k = str(_k) 259 if k not in c: 260 c[k] = {} 261 c = c[k] 262 263 c[keys[-1]] = val 264 265 return params_dict
Parse a string into a dictionary.
If the string begins with '{', parse as JSON. Otherwise use simple parsing.
Parameters
- params_string (str): The string to be parsed.
Returns
- The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2")
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
268def to_simple_dict(doc: Dict[str, Any]) -> str: 269 """ 270 Serialize a document dictionary in simple-dict format. 271 """ 272 import json 273 import ast 274 from meerschaum.utils.dtypes import json_serialize_value 275 276 def serialize_value(value): 277 if isinstance(value, str): 278 try: 279 evaluated = ast.literal_eval(value) 280 if not isinstance(evaluated, str): 281 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 282 return value 283 except (ValueError, SyntaxError, TypeError, MemoryError): 284 return value 285 286 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 287 288 return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items())
Serialize a document dictionary in simple-dict format.
291def parse_config_substitution( 292 value: str, 293 leading_key: str = 'MRSM', 294 begin_key: str = '{', 295 end_key: str = '}', 296 delimeter: str = ':', 297) -> List[Any]: 298 """ 299 Parse Meerschaum substitution syntax 300 E.g. MRSM{value1:value2} => ['value1', 'value2'] 301 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 302 """ 303 if not value.beginswith(leading_key): 304 return value 305 306 return leading_key[len(leading_key):][len():-1].split(delimeter)
Parse Meerschaum substitution syntax
E.g. MRSM{value1:value2} => ['value1', 'value2']
NOTE: Not currently used. See search_and_substitute_config in meerschaum.config._read_yaml.
309def edit_file( 310 path: Union[pathlib.Path, str], 311 default_editor: str = 'pyvim', 312 debug: bool = False 313) -> bool: 314 """ 315 Open a file for editing. 316 317 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 318 319 Parameters 320 ---------- 321 path: Union[pathlib.Path, str] 322 The path to the file to be edited. 323 324 default_editor: str, default 'pyvim' 325 If `$EDITOR` is not set, use this instead. 326 If `pyvim` is not installed, it will install it from PyPI. 327 328 debug: bool, default False 329 Verbosity toggle. 330 331 Returns 332 ------- 333 A bool indicating the file was successfully edited. 334 """ 335 from subprocess import call 336 from meerschaum.utils.debug import dprint 337 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 338 try: 339 EDITOR = os.environ.get('EDITOR', default_editor) 340 if debug: 341 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 342 rc = call([EDITOR, path]) 343 except Exception as e: ### can't open with default editors 344 if debug: 345 dprint(str(e)) 346 dprint('Failed to open file with system editor. Falling back to pyvim...') 347 pyvim = attempt_import('pyvim', lazy=False) 348 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 349 return rc == 0
Open a file for editing.
Attempt to launch the user's defined $EDITOR, otherwise use pyvim.
Parameters
- path (Union[pathlib.Path, str]): The path to the file to be edited.
- default_editor (str, default 'pyvim'):
If
$EDITORis not set, use this instead. Ifpyvimis not installed, it will install it from PyPI. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating the file was successfully edited.
352def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 353 """ 354 Determine the columns and lines in the terminal. 355 If they cannot be determined, return the default values (100 columns and 120 lines). 356 357 Parameters 358 ---------- 359 default_cols: int, default 100 360 If the columns cannot be determined, return this value. 361 362 default_lines: int, default 120 363 If the lines cannot be determined, return this value. 364 365 Returns 366 ------- 367 A tuple if integers for the columns and lines. 368 """ 369 try: 370 size = os.get_terminal_size() 371 _cols, _lines = size.columns, size.lines 372 except Exception: 373 _cols, _lines = ( 374 int(os.environ.get('COLUMNS', str(default_cols))), 375 int(os.environ.get('LINES', str(default_lines))), 376 ) 377 return _cols, _lines
Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).
Parameters
- default_cols (int, default 100): If the columns cannot be determined, return this value.
- default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
- A tuple if integers for the columns and lines.
380def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 381 """ 382 Iterate over a list in chunks. 383 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 384 385 Parameters 386 ---------- 387 iterable: Iterable[Any] 388 The iterable to iterate over in chunks. 389 390 chunksize: int 391 The size of chunks to iterate with. 392 393 fillvalue: Optional[Any], default None 394 If the chunks do not evenly divide into the iterable, pad the end with this value. 395 396 Returns 397 ------- 398 A generator of tuples of size `chunksize`. 399 400 """ 401 from itertools import zip_longest 402 args = [iter(iterable)] * chunksize 403 return zip_longest(*args, fillvalue=fillvalue)
Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
Parameters
- iterable (Iterable[Any]): The iterable to iterate over in chunks.
- chunksize (int): The size of chunks to iterate with.
- fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
- A generator of tuples of size
chunksize.
405def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 406 """ 407 Sort a dictionary's values and return a new dictionary. 408 409 Parameters 410 ---------- 411 d: Dict[Any, Any] 412 The dictionary to be sorted. 413 414 Returns 415 ------- 416 A sorted dictionary. 417 418 Examples 419 -------- 420 >>> sorted_dict({'b': 1, 'a': 2}) 421 {'b': 1, 'a': 2} 422 >>> sorted_dict({'b': 2, 'a': 1}) 423 {'a': 1, 'b': 2} 424 425 """ 426 try: 427 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 428 except Exception: 429 return d
Sort a dictionary's values and return a new dictionary.
Parameters
- d (Dict[Any, Any]): The dictionary to be sorted.
Returns
- A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
432def timed_input( 433 seconds: int = 10, 434 timeout_message: str = "", 435 prompt: str = "", 436 icon: bool = False, 437 **kw 438) -> Union[str, None]: 439 """ 440 Accept user input only for a brief period of time. 441 442 Parameters 443 ---------- 444 seconds: int, default 10 445 The number of seconds to wait. 446 447 timeout_message: str, default '' 448 The message to print after the window has elapsed. 449 450 prompt: str, default '' 451 The prompt to print during the window. 452 453 icon: bool, default False 454 If `True`, print the configured input icon. 455 456 457 Returns 458 ------- 459 The input string entered by the user. 460 461 """ 462 import signal, time 463 464 class TimeoutExpired(Exception): 465 """Raise this exception when the timeout is reached.""" 466 467 def alarm_handler(signum, frame): 468 raise TimeoutExpired 469 470 # set signal handler 471 signal.signal(signal.SIGALRM, alarm_handler) 472 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 473 474 try: 475 return input(prompt) 476 except TimeoutExpired: 477 return None 478 except (EOFError, RuntimeError): 479 try: 480 print(prompt) 481 time.sleep(seconds) 482 except TimeoutExpired: 483 return None 484 finally: 485 signal.alarm(0) # cancel alarm
Accept user input only for a brief period of time.
Parameters
- seconds (int, default 10): The number of seconds to wait.
- timeout_message (str, default ''): The message to print after the window has elapsed.
- prompt (str, default ''): The prompt to print during the window.
- icon (bool, default False):
If
True, print the configured input icon.
Returns
- The input string entered by the user.
488def enforce_gevent_monkey_patch(): 489 """ 490 Check if gevent monkey patching is enabled, and if not, then apply patching. 491 """ 492 from meerschaum.utils.packages import attempt_import 493 import socket 494 gevent, gevent_socket, gevent_monkey = attempt_import( 495 'gevent', 'gevent.socket', 'gevent.monkey' 496 ) 497 if not socket.socket is gevent_socket.socket: 498 gevent_monkey.patch_all()
Check if gevent monkey patching is enabled, and if not, then apply patching.
500def is_valid_email(email: str) -> Union['re.Match', None]: 501 """ 502 Check whether a string is a valid email. 503 504 Parameters 505 ---------- 506 email: str 507 The string to be examined. 508 509 Returns 510 ------- 511 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 512 513 Examples 514 -------- 515 >>> is_valid_email('foo') 516 >>> is_valid_email('foo@foo.com') 517 <re.Match object; span=(0, 11), match='foo@foo.com'> 518 519 """ 520 import re 521 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 522 return re.search(regex, email)
Check whether a string is a valid email.
Parameters
- email (str): The string to be examined.
Returns
- None if a string is not in email format, otherwise a
re.Matchobject, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
525def string_width(string: str, widest: bool = True) -> int: 526 """ 527 Calculate the width of a string, either by its widest or last line. 528 529 Parameters 530 ---------- 531 string: str: 532 The string to be examined. 533 534 widest: bool, default True 535 No longer used because `widest` is always assumed to be true. 536 537 Returns 538 ------- 539 An integer for the text's visual width. 540 541 Examples 542 -------- 543 >>> string_width('a') 544 1 545 >>> string_width('a\\nbc\\nd') 546 2 547 548 """ 549 def _widest(): 550 words = string.split('\n') 551 max_length = 0 552 for w in words: 553 length = len(w) 554 if length > max_length: 555 max_length = length 556 return max_length 557 558 return _widest()
Calculate the width of a string, either by its widest or last line.
Parameters
- string (str:): The string to be examined.
- widest (bool, default True):
No longer used because
widestis always assumed to be true.
Returns
- An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
603def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any: 604 """ 605 Get a value from a dictionary with a tuple of keys. 606 607 Parameters 608 ---------- 609 d: Dict[Any, Any] 610 The dictionary to search. 611 612 path: Tuple[Any, ...] 613 The path of keys to traverse. 614 615 Returns 616 ------- 617 The value from the end of the path. 618 """ 619 return functools.reduce(lambda di, key: di[key], path, d)
Get a value from a dictionary with a tuple of keys.
Parameters
- d (Dict[Any, Any]): The dictionary to search.
- path (Tuple[Any, ...]): The path of keys to traverse.
Returns
- The value from the end of the path.
622def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None: 623 """ 624 Set a value in a dictionary with a tuple of keys. 625 626 Parameters 627 ---------- 628 d: Dict[Any, Any] 629 The dictionary to search. 630 631 path: Tuple[Any, ...] 632 The path of keys to traverse. 633 634 val: Any 635 The value to set at the end of the path. 636 """ 637 get_val_from_dict_path(d, path[:-1])[path[-1]] = val
Set a value in a dictionary with a tuple of keys.
Parameters
- d (Dict[Any, Any]): The dictionary to search.
- path (Tuple[Any, ...]): The path of keys to traverse.
- val (Any): The value to set at the end of the path.
640def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 641 """ 642 Recursively replace passwords in a dictionary. 643 644 Parameters 645 ---------- 646 d: Dict[str, Any] 647 The dictionary to search through. 648 649 replace_with: str, default '*' 650 The string to replace each character of the password with. 651 652 Returns 653 ------- 654 Another dictionary where values to the keys `'password'` 655 are replaced with `replace_with` (`'*'`). 656 657 Examples 658 -------- 659 >>> replace_password({'a': 1}) 660 {'a': 1} 661 >>> replace_password({'password': '123'}) 662 {'password': '***'} 663 >>> replace_password({'nested': {'password': '123'}}) 664 {'nested': {'password': '***'}} 665 >>> replace_password({'password': '123'}, replace_with='!') 666 {'password': '!!!'} 667 668 """ 669 import copy 670 _d = copy.deepcopy(d) 671 for k, v in d.items(): 672 if isinstance(v, dict): 673 _d[k] = replace_password(v) 674 elif 'password' in str(k).lower(): 675 _d[k] = ''.join([replace_with for char in str(v)]) 676 elif str(k).lower() == 'uri': 677 from meerschaum.connectors.sql import SQLConnector 678 try: 679 uri_params = SQLConnector.parse_uri(v) 680 except Exception: 681 uri_params = None 682 if not uri_params: 683 continue 684 if not 'username' in uri_params or not 'password' in uri_params: 685 continue 686 _d[k] = v.replace( 687 uri_params['username'] + ':' + uri_params['password'], 688 uri_params['username'] + ':' + ''.join( 689 [replace_with for char in str(uri_params['password'])] 690 ) 691 ) 692 return _d
Recursively replace passwords in a dictionary.
Parameters
- d (Dict[str, Any]): The dictionary to search through.
- replace_with (str, default '*'): The string to replace each character of the password with.
Returns
- Another dictionary where values to the keys
'password' - are replaced with
replace_with('*').
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
695def filter_arguments( 696 func: Callable[[Any], Any], 697 *args: Any, 698 **kwargs: Any 699) -> Tuple[Tuple[Any], Dict[str, Any]]: 700 """ 701 Filter out unsupported positional and keyword arguments. 702 703 Parameters 704 ---------- 705 func: Callable[[Any], Any] 706 The function to inspect. 707 708 *args: Any 709 Positional arguments to filter and pass to `func`. 710 711 **kwargs 712 Keyword arguments to filter and pass to `func`. 713 714 Returns 715 ------- 716 The `args` and `kwargs` accepted by `func`. 717 """ 718 args = filter_positionals(func, *args) 719 kwargs = filter_keywords(func, **kwargs) 720 return args, kwargs
Filter out unsupported positional and keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
Positional arguments to filter and pass to
func. - **kwargs: Keyword arguments to filter and pass to
func.
Returns
- The
argsandkwargsaccepted byfunc.
723def filter_keywords( 724 func: Callable[[Any], Any], 725 **kw: Any 726) -> Dict[str, Any]: 727 """ 728 Filter out unsupported keyword arguments. 729 730 Parameters 731 ---------- 732 func: Callable[[Any], Any] 733 The function to inspect. 734 735 **kw: Any 736 The arguments to be filtered and passed into `func`. 737 738 Returns 739 ------- 740 A dictionary of keyword arguments accepted by `func`. 741 742 Examples 743 -------- 744 ```python 745 >>> def foo(a=1, b=2): 746 ... return a * b 747 >>> filter_keywords(foo, a=2, b=4, c=6) 748 {'a': 2, 'b': 4} 749 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 750 8 751 ``` 752 753 """ 754 import inspect 755 func_params = inspect.signature(func).parameters 756 ### If the function has a **kw method, skip filtering. 757 for param, _type in func_params.items(): 758 if '**' in str(_type): 759 return kw 760 return {k: v for k, v in kw.items() if k in func_params}
Filter out unsupported keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- **kw (Any):
The arguments to be filtered and passed into
func.
Returns
- A dictionary of keyword arguments accepted by
func.
Examples
>>> def foo(a=1, b=2):
... return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
763def filter_positionals( 764 func: Callable[[Any], Any], 765 *args: Any 766) -> Tuple[Any]: 767 """ 768 Filter out unsupported positional arguments. 769 770 Parameters 771 ---------- 772 func: Callable[[Any], Any] 773 The function to inspect. 774 775 *args: Any 776 The arguments to be filtered and passed into `func`. 777 NOTE: If the function signature expects more arguments than provided, 778 the missing slots will be filled with `None`. 779 780 Returns 781 ------- 782 A tuple of positional arguments accepted by `func`. 783 784 Examples 785 -------- 786 ```python 787 >>> def foo(a, b): 788 ... return a * b 789 >>> filter_positionals(foo, 2, 4, 6) 790 (2, 4) 791 >>> foo(*filter_positionals(foo, 2, 4, 6)) 792 8 793 ``` 794 795 """ 796 import inspect 797 from meerschaum.utils.warnings import warn 798 func_params = inspect.signature(func).parameters 799 acceptable_args: List[Any] = [] 800 801 def _warn_invalids(_num_invalid): 802 if _num_invalid > 0: 803 warn( 804 "Too few arguments were provided. " 805 + f"{_num_invalid} argument" 806 + ('s have ' if _num_invalid != 1 else " has ") 807 + " been filled with `None`.", 808 ) 809 810 num_invalid: int = 0 811 for i, (param, val) in enumerate(func_params.items()): 812 if '=' in str(val) or '*' in str(val): 813 _warn_invalids(num_invalid) 814 return tuple(acceptable_args) 815 816 try: 817 acceptable_args.append(args[i]) 818 except IndexError: 819 acceptable_args.append(None) 820 num_invalid += 1 821 822 _warn_invalids(num_invalid) 823 return tuple(acceptable_args)
Filter out unsupported positional arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
The arguments to be filtered and passed into
func. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled withNone.
Returns
- A tuple of positional arguments accepted by
func.
Examples
>>> def foo(a, b):
... return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
826def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 827 """ 828 Convert an ordered dict to a dict. 829 Does not mutate the original OrderedDict. 830 """ 831 from collections import OrderedDict 832 _d = dict(od) 833 for k, v in od.items(): 834 if isinstance(v, OrderedDict) or ( 835 issubclass(type(v), OrderedDict) 836 ): 837 _d[k] = dict_from_od(v) 838 return _d
Convert an ordered dict to a dict. Does not mutate the original OrderedDict.
841def remove_ansi(s: str) -> str: 842 """ 843 Remove ANSI escape characters from a string. 844 845 Parameters 846 ---------- 847 s: str: 848 The string to be cleaned. 849 850 Returns 851 ------- 852 A string with the ANSI characters removed. 853 854 Examples 855 -------- 856 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 857 'Hello, World!' 858 859 """ 860 import re 861 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
Remove ANSI escape characters from a string.
Parameters
- s (str:): The string to be cleaned.
Returns
- A string with the ANSI characters removed.
Examples
>>> remove_ansi("[1;31mHello, World![0m")
'Hello, World!'
864def get_connector_labels( 865 *types: str, 866 search_term: str = '', 867 ignore_exact_match = True, 868 _additional_options: Optional[List[str]] = None, 869) -> List[str]: 870 """ 871 Read connector labels from the configuration dictionary. 872 873 Parameters 874 ---------- 875 *types: str 876 The connector types. 877 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 878 879 search_term: str, default '' 880 A filter on the connectors' labels. 881 882 ignore_exact_match: bool, default True 883 If `True`, skip a connector if the search_term is an exact match. 884 885 Returns 886 ------- 887 A list of the keys of defined connectors. 888 889 """ 890 from meerschaum.config import get_config 891 connectors = get_config('meerschaum', 'connectors') 892 893 _types = list(types) 894 if len(_types) == 0: 895 _types = list(connectors.keys()) + ['plugin'] 896 897 conns = [] 898 for t in _types: 899 if t == 'plugin': 900 from meerschaum.plugins import get_data_plugins 901 conns += [ 902 f'{t}:' + plugin.module.__name__.split('.')[-1] 903 for plugin in get_data_plugins() 904 ] 905 continue 906 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 907 908 if _additional_options: 909 conns += _additional_options 910 911 possibilities = [ 912 c 913 for c in conns 914 if c.startswith(search_term) 915 and c != ( 916 search_term if ignore_exact_match else '' 917 ) 918 ] 919 return sorted(possibilities)
Read connector labels from the configuration dictionary.
Parameters
- *types (str):
The connector types.
If none are provided, use the defined types (
'sql'and'api') and'plugin'. - search_term (str, default ''): A filter on the connectors' labels.
- ignore_exact_match (bool, default True):
If
True, skip a connector if the search_term is an exact match.
Returns
- A list of the keys of defined connectors.
922def wget( 923 url: str, 924 dest: Optional[Union[str, 'pathlib.Path']] = None, 925 headers: Optional[Dict[str, Any]] = None, 926 color: bool = True, 927 debug: bool = False, 928 **kw: Any 929) -> 'pathlib.Path': 930 """ 931 Mimic `wget` with `requests`. 932 933 Parameters 934 ---------- 935 url: str 936 The URL to the resource to be downloaded. 937 938 dest: Optional[Union[str, pathlib.Path]], default None 939 The destination path of the downloaded file. 940 If `None`, save to the current directory. 941 942 color: bool, default True 943 If `debug` is `True`, print color output. 944 945 debug: bool, default False 946 Verbosity toggle. 947 948 Returns 949 ------- 950 The path to the downloaded file. 951 952 """ 953 from meerschaum.utils.warnings import warn, error 954 from meerschaum.utils.debug import dprint 955 import re, urllib.request 956 if headers is None: 957 headers = {} 958 request = urllib.request.Request(url, headers=headers) 959 if not color: 960 dprint = print 961 if debug: 962 dprint(f"Downloading from '{url}'...") 963 try: 964 response = urllib.request.urlopen(request) 965 except Exception as e: 966 import ssl 967 ssl._create_default_https_context = ssl._create_unverified_context 968 try: 969 response = urllib.request.urlopen(request) 970 except Exception as _e: 971 print(_e) 972 response = None 973 if response is None or response.code != 200: 974 error_msg = f"Failed to download from '{url}'." 975 if color: 976 error(error_msg) 977 else: 978 print(error_msg) 979 import sys 980 sys.exit(1) 981 982 d = response.headers.get('content-disposition', None) 983 fname = ( 984 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 985 else url.split('/')[-1] 986 ) 987 988 if dest is None: 989 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 990 elif isinstance(dest, str): 991 dest = pathlib.Path(dest) 992 993 with open(dest, 'wb') as f: 994 f.write(response.fp.read()) 995 996 if debug: 997 dprint(f"Downloaded file '{dest}'.") 998 999 return dest
Mimic wget with requests.
Parameters
- url (str): The URL to the resource to be downloaded.
- dest (Optional[Union[str, pathlib.Path]], default None):
The destination path of the downloaded file.
If
None, save to the current directory. - color (bool, default True):
If
debugisTrue, print color output. - debug (bool, default False): Verbosity toggle.
Returns
- The path to the downloaded file.
1002def async_wrap(func): 1003 """ 1004 Run a synchronous function as async. 1005 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1006 """ 1007 import asyncio 1008 from functools import wraps, partial 1009 1010 @wraps(func) 1011 async def run(*args, loop=None, executor=None, **kwargs): 1012 if loop is None: 1013 loop = asyncio.get_event_loop() 1014 pfunc = partial(func, *args, **kwargs) 1015 return await loop.run_in_executor(executor, pfunc) 1016 return run
Run a synchronous function as async. https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1019def debug_trace(browser: bool = True): 1020 """ 1021 Open a web-based debugger to trace the execution of the program. 1022 1023 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1024 """ 1025 from meerschaum.utils.debug import trace 1026 trace(browser=browser)
Open a web-based debugger to trace the execution of the program.
This is an alias import for meerschaum.utils.debug.debug_trace.
1029def items_str( 1030 items: List[Any], 1031 quotes: bool = True, 1032 quote_str: str = "'", 1033 commas: bool = True, 1034 comma_str: str = ',', 1035 and_: bool = True, 1036 and_str: str = 'and', 1037 oxford_comma: bool = True, 1038 spaces: bool = True, 1039 space_str = ' ', 1040) -> str: 1041 """ 1042 Return a formatted string if list items separated by commas. 1043 1044 Parameters 1045 ---------- 1046 items: [List[Any]] 1047 The items to be printed as an English list. 1048 1049 quotes: bool, default True 1050 If `True`, wrap items in quotes. 1051 1052 quote_str: str, default "'" 1053 If `quotes` is `True`, prepend and append each item with this string. 1054 1055 and_: bool, default True 1056 If `True`, include the word 'and' before the final item in the list. 1057 1058 and_str: str, default 'and' 1059 If `and_` is True, insert this string where 'and' normally would in and English list. 1060 1061 oxford_comma: bool, default True 1062 If `True`, include the Oxford Comma (comma before the final 'and'). 1063 Only applies when `and_` is `True`. 1064 1065 spaces: bool, default True 1066 If `True`, separate items with `space_str` 1067 1068 space_str: str, default ' ' 1069 If `spaces` is `True`, separate items with this string. 1070 1071 Returns 1072 ------- 1073 A string of the items as an English list. 1074 1075 Examples 1076 -------- 1077 >>> items_str([1,2,3]) 1078 "'1', '2', and '3'" 1079 >>> items_str([1,2,3], quotes=False) 1080 '1, 2, and 3' 1081 >>> items_str([1,2,3], and_=False) 1082 "'1', '2', '3'" 1083 >>> items_str([1,2,3], spaces=False, and_=False) 1084 "'1','2','3'" 1085 >>> items_str([1,2,3], oxford_comma=False) 1086 "'1', '2' and '3'" 1087 >>> items_str([1,2,3], quote_str=":") 1088 ':1:, :2:, and :3:' 1089 >>> items_str([1,2,3], and_str="or") 1090 "'1', '2', or '3'" 1091 >>> items_str([1,2,3], space_str="_") 1092 "'1',_'2',_and_'3'" 1093 1094 """ 1095 if not items: 1096 return '' 1097 1098 q = quote_str if quotes else '' 1099 s = space_str if spaces else '' 1100 a = and_str if and_ else '' 1101 c = comma_str if commas else '' 1102 1103 if len(items) == 1: 1104 return q + str(list(items)[0]) + q 1105 1106 if len(items) == 2: 1107 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1108 1109 sep = q + c + s + q 1110 output = q + sep.join(str(i) for i in items[:-1]) + q 1111 if oxford_comma: 1112 output += c 1113 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1114 return output
Return a formatted string if list items separated by commas.
Parameters
- items ([List[Any]]): The items to be printed as an English list.
- quotes (bool, default True):
If
True, wrap items in quotes. - quote_str (str, default "'"):
If
quotesisTrue, prepend and append each item with this string. - and_ (bool, default True):
If
True, include the word 'and' before the final item in the list. - and_str (str, default 'and'):
If
and_is True, insert this string where 'and' normally would in and English list. - oxford_comma (bool, default True):
If
True, include the Oxford Comma (comma before the final 'and'). Only applies whenand_isTrue. - spaces (bool, default True):
If
True, separate items withspace_str - space_str (str, default ' '):
If
spacesisTrue, separate items with this string.
Returns
- A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
1117def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str: 1118 """ 1119 Return a human-readable string for a `timedelta` (or `int` minutes). 1120 1121 Parameters 1122 ---------- 1123 delta: Union[timedelta, int] 1124 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1125 1126 round_unit: bool, default False 1127 If `True`, round the output to a single unit. 1128 1129 Returns 1130 ------- 1131 A formatted string, fit for human eyes. 1132 """ 1133 from meerschaum.utils.packages import attempt_import 1134 if is_int(str(delta)) and not round_unit: 1135 return str(delta) 1136 1137 humanfriendly = attempt_import('humanfriendly', lazy=False) 1138 delta_seconds = ( 1139 delta.total_seconds() 1140 if hasattr(delta, 'total_seconds') 1141 else (delta * 60) 1142 ) 1143 1144 is_negative = delta_seconds < 0 1145 delta_seconds = abs(delta_seconds) 1146 replace_units = {} 1147 1148 if round_unit: 1149 if delta_seconds < 1: 1150 delta_seconds = round(delta_seconds, 2) 1151 elif delta_seconds < 60: 1152 delta_seconds = int(delta_seconds) 1153 elif delta_seconds < 3600: 1154 delta_seconds = int(delta_seconds / 60) * 60 1155 elif delta_seconds < 86400: 1156 delta_seconds = int(delta_seconds / 3600) * 3600 1157 elif delta_seconds < (86400 * 7): 1158 delta_seconds = int(delta_seconds / 86400) * 86400 1159 elif delta_seconds < (86400 * 7 * 4): 1160 delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7) 1161 elif delta_seconds < (86400 * 7 * 4 * 13): 1162 delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7) 1163 replace_units['weeks'] = 'months' 1164 else: 1165 delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364) 1166 1167 delta_str = humanfriendly.format_timespan(delta_seconds) 1168 if ',' in delta_str and round_unit: 1169 delta_str = delta_str.split(',')[0] 1170 elif ' and ' in delta_str and round_unit: 1171 delta_str = delta_str.split(' and ')[0] 1172 1173 for parsed_unit, replacement_unit in replace_units.items(): 1174 delta_str = delta_str.replace(parsed_unit, replacement_unit) 1175 1176 return delta_str + (' ago' if is_negative else '')
Return a human-readable string for a timedelta (or int minutes).
Parameters
- delta (Union[timedelta, int]):
The interval to print. If
deltais an integer, assume it corresponds to minutes. - round_unit (bool, default False):
If
True, round the output to a single unit.
Returns
- A formatted string, fit for human eyes.
1179def is_docker_available() -> bool: 1180 """Check if we can connect to the Docker engine.""" 1181 import subprocess 1182 try: 1183 has_docker = subprocess.call( 1184 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1185 ) == 0 1186 except Exception: 1187 has_docker = False 1188 return has_docker
Check if we can connect to the Docker engine.
1191def is_android() -> bool: 1192 """Return `True` if the current platform is Android.""" 1193 import sys 1194 return hasattr(sys, 'getandroidapilevel')
Return True if the current platform is Android.
1197def is_bcp_available() -> bool: 1198 """Check if the MSSQL `bcp` utility is installed.""" 1199 import subprocess 1200 1201 try: 1202 has_bcp = subprocess.call( 1203 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1204 ) == 0 1205 except Exception: 1206 has_bcp = False 1207 return has_bcp
Check if the MSSQL bcp utility is installed.
1210def is_systemd_available() -> bool: 1211 """Check if running on systemd.""" 1212 return os.path.isdir('/run/systemd/system')
Check if running on systemd.
1215def is_tmux_available() -> bool: 1216 """ 1217 Check if `tmux` is installed. 1218 """ 1219 import subprocess 1220 try: 1221 has_tmux = subprocess.call( 1222 ['tmux', '-V'], 1223 stdout=subprocess.DEVNULL, 1224 stderr=subprocess.STDOUT 1225 ) == 0 1226 except FileNotFoundError: 1227 has_tmux = False 1228 except Exception: 1229 has_tmux = False 1230 return has_tmux
Check if tmux is installed.
1232def get_last_n_lines(file_name: str, N: int): 1233 """ 1234 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1235 """ 1236 # Create an empty list to keep the track of last N lines 1237 list_of_lines = [] 1238 # Open file for reading in binary mode 1239 with open(file_name, 'rb') as read_obj: 1240 # Move the cursor to the end of the file 1241 read_obj.seek(0, os.SEEK_END) 1242 # Create a buffer to keep the last read line 1243 buffer = bytearray() 1244 # Get the current position of pointer i.e eof 1245 pointer_location = read_obj.tell() 1246 # Loop till pointer reaches the top of the file 1247 while pointer_location >= 0: 1248 # Move the file pointer to the location pointed by pointer_location 1249 read_obj.seek(pointer_location) 1250 # Shift pointer location by -1 1251 pointer_location = pointer_location -1 1252 # read that byte / character 1253 new_byte = read_obj.read(1) 1254 # If the read byte is new line character then it means one line is read 1255 if new_byte == b'\n': 1256 # Save the line in list of lines 1257 list_of_lines.append(buffer.decode()[::-1]) 1258 # If the size of list reaches N, then return the reversed list 1259 if len(list_of_lines) == N: 1260 return list(reversed(list_of_lines)) 1261 # Reinitialize the byte array to save next line 1262 buffer = bytearray() 1263 else: 1264 # If last read character is not eol then add it in buffer 1265 buffer.extend(new_byte) 1266 # As file is read completely, if there is still data in buffer, then its first line. 1267 if len(buffer) > 0: 1268 list_of_lines.append(buffer.decode()[::-1]) 1269 # return the reversed list 1270 return list(reversed(list_of_lines))
1273def tail(f, n, offset=None): 1274 """ 1275 https://stackoverflow.com/a/692616/9699829 1276 1277 Reads n lines from f with an offset of offset lines. The return 1278 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1279 an indicator that is `True` if there are more lines in the file. 1280 """ 1281 avg_line_length = 74 1282 to_read = n + (offset or 0) 1283 1284 while True: 1285 try: 1286 f.seek(-(avg_line_length * to_read), 2) 1287 except IOError: 1288 # woops. apparently file is smaller than what we want 1289 # to step back, go to the beginning instead 1290 f.seek(0) 1291 pos = f.tell() 1292 lines = f.read().splitlines() 1293 if len(lines) >= to_read or pos == 0: 1294 return lines[-to_read:offset and -offset or None], \ 1295 len(lines) > to_read or pos > 0 1296 avg_line_length *= 1.3
https://stackoverflow.com/a/692616/9699829
Reads n lines from f with an offset of offset lines. The return
value is a tuple in the form (lines, has_more) where has_more is
an indicator that is True if there are more lines in the file.
1299def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1300 """ 1301 Remove characters from each section of a string until the length is within the limit. 1302 1303 Parameters 1304 ---------- 1305 item: str 1306 The item name to be truncated. 1307 1308 delimeter: str, default '_' 1309 Split `item` by this string into several sections. 1310 1311 max_len: int, default 128 1312 The max acceptable length of the truncated version of `item`. 1313 1314 Returns 1315 ------- 1316 The truncated string. 1317 1318 Examples 1319 -------- 1320 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1321 'ab_de_gh' 1322 1323 """ 1324 if len(item) < max_len: 1325 return item 1326 1327 def _shorten(s: str) -> str: 1328 return s[:-1] if len(s) > 1 else s 1329 1330 sections = list(enumerate(item.split(delimeter))) 1331 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1332 available_chars = max_len - len(sections) 1333 1334 _sections = [(i, s) for i, s in sorted_sections] 1335 _sections_len = sum([len(s) for i, s in _sections]) 1336 _old_sections_len = _sections_len 1337 while _sections_len > available_chars: 1338 _sections = [(i, _shorten(s)) for i, s in _sections] 1339 _old_sections_len = _sections_len 1340 _sections_len = sum([len(s) for i, s in _sections]) 1341 if _old_sections_len == _sections_len: 1342 raise Exception(f"String could not be truncated: '{item}'") 1343 1344 new_sections = sorted(_sections, key=lambda x: x[0]) 1345 return delimeter.join([s for i, s in new_sections])
Remove characters from each section of a string until the length is within the limit.
Parameters
- item (str): The item name to be truncated.
- delimeter (str, default '_'):
Split
itemby this string into several sections. - max_len (int, default 128):
The max acceptable length of the truncated version of
item.
Returns
- The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
1348def truncate_text_for_display( 1349 text: str, 1350 max_length: int = 50, 1351 suffix: str = '…', 1352) -> str: 1353 """ 1354 Truncate a potentially long string for display purposes. 1355 1356 Parameters 1357 ---------- 1358 text: str 1359 The string to be truncated. 1360 1361 max_length: int, default 60 1362 The maximum length of `text` before truncation. 1363 1364 suffix: str, default '…' 1365 The string to append to the length of `text` to indicate truncation. 1366 1367 Returns 1368 ------- 1369 A string of length `max_length` or less. 1370 """ 1371 text_length = len(text) 1372 if text_length <= max_length: 1373 return text 1374 1375 suffix_length = len(suffix) 1376 1377 truncated_text = text[:max_length - suffix_length] 1378 return truncated_text + suffix
Truncate a potentially long string for display purposes.
Parameters
- text (str): The string to be truncated.
- max_length (int, default 60):
The maximum length of
textbefore truncation. - suffix (str, default '…'):
The string to append to the length of
textto indicate truncation.
Returns
- A string of length
max_lengthor less.
1381def separate_negation_values( 1382 vals: Union[List[str], Tuple[str]], 1383 negation_prefix: Optional[str] = None, 1384) -> Tuple[List[str], List[str]]: 1385 """ 1386 Separate the negated values from the positive ones. 1387 Return two lists: positive and negative values. 1388 1389 Parameters 1390 ---------- 1391 vals: Union[List[str], Tuple[str]] 1392 A list of strings to parse. 1393 1394 negation_prefix: Optional[str], default None 1395 Include values that start with this string in the second list. 1396 If `None`, use the system default (`_`). 1397 """ 1398 if negation_prefix is None: 1399 from meerschaum._internal.static import STATIC_CONFIG 1400 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1401 _in_vals, _ex_vals = [], [] 1402 for v in vals: 1403 if str(v).startswith(negation_prefix): 1404 _ex_vals.append(str(v)[len(negation_prefix):]) 1405 else: 1406 _in_vals.append(v) 1407 1408 return _in_vals, _ex_vals
Separate the negated values from the positive ones. Return two lists: positive and negative values.
Parameters
- vals (Union[List[str], Tuple[str]]): A list of strings to parse.
- negation_prefix (Optional[str], default None):
Include values that start with this string in the second list.
If
None, use the system default (_).
1411def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1412 """ 1413 Translate a params dictionary into lists of include- and exclude-values. 1414 1415 Parameters 1416 ---------- 1417 params: Optional[Dict[str, Any]] 1418 A params query dictionary. 1419 1420 Returns 1421 ------- 1422 A dictionary mapping keys to a tuple of lists for include and exclude values. 1423 1424 Examples 1425 -------- 1426 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1427 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1428 """ 1429 if not params: 1430 return {} 1431 return { 1432 col: separate_negation_values( 1433 ( 1434 val 1435 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype') 1436 else [val] 1437 ) 1438 ) 1439 for col, val in params.items() 1440 }
Translate a params dictionary into lists of include- and exclude-values.
Parameters
- params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
- A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
1443def flatten_list(list_: List[Any]) -> List[Any]: 1444 """ 1445 Recursively flatten a list. 1446 """ 1447 for item in list_: 1448 if isinstance(item, list): 1449 yield from flatten_list(item) 1450 else: 1451 yield item
Recursively flatten a list.
1454def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1455 """ 1456 Parse a string containing the text to be passed into a function 1457 and return a tuple of args, kwargs. 1458 1459 Parameters 1460 ---------- 1461 args_str: str 1462 The contents of the function parameter (as a string). 1463 1464 Returns 1465 ------- 1466 A tuple of args (tuple) and kwargs (dict[str, Any]). 1467 1468 Examples 1469 -------- 1470 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1471 (123, 456), {'foo': 789, 'bar': 'baz'} 1472 """ 1473 import ast 1474 args = [] 1475 kwargs = {} 1476 1477 for part in args_str.split(','): 1478 if '=' in part: 1479 key, val = part.split('=', 1) 1480 kwargs[key.strip()] = ast.literal_eval(val) 1481 else: 1482 args.append(ast.literal_eval(part.strip())) 1483 1484 return tuple(args), kwargs
Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.
Parameters
- args_str (str): The contents of the function parameter (as a string).
Returns
- A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
1487def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1488 """ 1489 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1490 1491 Parameters 1492 ---------- 1493 src_path: pathlib.Path 1494 The source path. 1495 1496 dest_path: pathlib.Path 1497 The destination path. 1498 1499 Returns 1500 ------- 1501 A SuccessTuple indicating success. 1502 """ 1503 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1504 return True, "Symlink already exists." 1505 try: 1506 dest_path.symlink_to(src_path) 1507 success = True 1508 except Exception as e: 1509 success = False 1510 msg = str(e) 1511 if success: 1512 return success, "Success" 1513 1514 ### Failed to create a symlink. 1515 ### If we're not on Windows, return an error. 1516 import platform 1517 if platform.system() != 'Windows': 1518 return success, msg 1519 1520 try: 1521 import _winapi 1522 except ImportError: 1523 return False, "Unable to import _winapi." 1524 1525 if src_path.is_dir(): 1526 try: 1527 _winapi.CreateJunction(str(src_path), str(dest_path)) 1528 except Exception as e: 1529 return False, str(e) 1530 return True, "Success" 1531 1532 ### Last resort: copy the file on Windows. 1533 import shutil 1534 try: 1535 shutil.copy(src_path, dest_path) 1536 except Exception as e: 1537 return False, str(e) 1538 1539 return True, "Success"
Wrap around pathlib.Path.symlink_to, but add support for Windows.
Parameters
- src_path (pathlib.Path): The source path.
- dest_path (pathlib.Path): The destination path.
Returns
- A SuccessTuple indicating success.
1542def is_symlink(path: pathlib.Path) -> bool: 1543 """ 1544 Wrap `path.is_symlink()` but add support for Windows junctions. 1545 """ 1546 if path.is_symlink(): 1547 return True 1548 1549 import platform 1550 if platform.system() != 'Windows': 1551 return False 1552 try: 1553 return bool(os.readlink(path)) 1554 except OSError: 1555 return False
Wrap path.is_symlink() but add support for Windows junctions.
1558def parametrized(dec): 1559 """ 1560 A meta-decorator for allowing other decorator functions to have parameters. 1561 1562 https://stackoverflow.com/a/26151604/9699829 1563 """ 1564 def layer(*args, **kwargs): 1565 def repl(f): 1566 return dec(f, *args, **kwargs) 1567 return repl 1568 return layer
A meta-decorator for allowing other decorator functions to have parameters.
1571def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1572 """ 1573 Safely extract a TAR file to a give directory. 1574 This defends against CVE-2007-4559. 1575 1576 Parameters 1577 ---------- 1578 tarf: file 1579 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1580 1581 output_dir: Union[str, pathlib.Path] 1582 The output directory. 1583 """ 1584 1585 def is_within_directory(directory, target): 1586 abs_directory = os.path.abspath(directory) 1587 abs_target = os.path.abspath(target) 1588 prefix = os.path.commonprefix([abs_directory, abs_target]) 1589 return prefix == abs_directory 1590 1591 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1592 for member in tar.getmembers(): 1593 member_path = os.path.join(path, member.name) 1594 if not is_within_directory(path, member_path): 1595 raise Exception("Attempted Path Traversal in Tar File") 1596 1597 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1598 1599 return safe_extract(tarf, output_dir)
Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.
Parameters
- tarf (file):
The TAR file opened with
tarfile.open(path, 'r:gz'). - output_dir (Union[str, pathlib.Path]): The output directory.
1602def to_snake_case(name: str) -> str: 1603 """ 1604 Return the given string in snake-case-style. 1605 1606 Parameters 1607 ---------- 1608 name: str 1609 The input text to convert to snake case. 1610 1611 Returns 1612 ------- 1613 A snake-case version of `name`. 1614 1615 Examples 1616 -------- 1617 >>> to_snake_case("HelloWorld!") 1618 'hello_world' 1619 >>> to_snake_case("This has spaces in it.") 1620 'this_has_spaces_in_it' 1621 >>> to_snake_case("already_in_snake_case") 1622 'already_in_snake_case' 1623 """ 1624 import re 1625 name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name) 1626 name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name) 1627 name = re.sub(r'[^\w\s]', '', name) 1628 name = re.sub(r'\s+', '_', name) 1629 return name.lower()
Return the given string in snake-case-style.
Parameters
- name (str): The input text to convert to snake case.
Returns
- A snake-case version of
name.
Examples
>>> to_snake_case("HelloWorld!")
'hello_world'
>>> to_snake_case("This has spaces in it.")
'this_has_spaces_in_it'
>>> to_snake_case("already_in_snake_case")
'already_in_snake_case'
1632def get_directory_size(path: Path) -> int: 1633 """ 1634 Return the cumulative size of a directory's files in bytes. 1635 https://stackoverflow.com/a/55659577/9699829 1636 """ 1637 return sum(file.stat().st_size for file in path.rglob('*'))
Return the cumulative size of a directory's files in bytes. https://stackoverflow.com/a/55659577/9699829
1644def choose_subaction(*args, **kwargs) -> Any: 1645 """ 1646 Placeholder function to prevent breaking legacy behavior. 1647 See `meerschaum.actions.choose_subaction`. 1648 """ 1649 from meerschaum.actions import choose_subaction as _choose_subactions 1650 return _choose_subactions(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.actions.choose_subaction.
1653def print_options(*args, **kwargs) -> None: 1654 """ 1655 Placeholder function to prevent breaking legacy behavior. 1656 See `meerschaum.utils.formatting.print_options`. 1657 """ 1658 from meerschaum.utils.formatting import print_options as _print_options 1659 return _print_options(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.utils.formatting.print_options.
1770def json_serialize_datetime(dt: datetime) -> Union[str, None]: 1771 """ 1772 Serialize a datetime object into JSON (ISO format string). 1773 1774 Examples 1775 -------- 1776 >>> import json 1777 >>> from datetime import datetime 1778 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 1779 '{"a": "2022-01-01T00:00:00Z"}' 1780 1781 """ 1782 from meerschaum.utils.dtypes import serialize_datetime 1783 return serialize_datetime(dt)
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
1813def flatten_pipes_dict(*args, **kwargs): 1814 """ 1815 Placeholder function to prevent breaking legacy behavior. 1816 See `meerschaum.utils.pipes.flatten_pipes_dict`. 1817 """ 1818 from meerschaum.utils.pipes import flatten_pipes_dict 1819 return flatten_pipes_dict(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.utils.pipes.flatten_pipes_dict.