meerschaum.utils.misc
Miscellaneous functions go here
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4""" 5Miscellaneous functions go here 6""" 7 8from __future__ import annotations 9 10import os 11import sys 12import functools 13import pathlib 14from datetime import timedelta, datetime 15 16from meerschaum.utils.typing import ( 17 Union, 18 Any, 19 Callable, 20 Optional, 21 List, 22 Dict, 23 SuccessTuple, 24 Iterable, 25 PipesDict, 26 Tuple, 27 TYPE_CHECKING, 28) 29if TYPE_CHECKING: 30 import collections 31 32__pdoc__: Dict[str, bool] = { 33 'to_pandas_dtype': False, 34 'filter_unseen_df': False, 35 'add_missing_cols_to_df': False, 36 'parse_df_datetimes': False, 37 'df_from_literal': False, 38 'get_json_cols': False, 39 'get_unhashable_cols': False, 40 'enforce_dtypes': False, 41 'get_datetime_bound_from_df': False, 42 'df_is_chunk_generator': False, 43 'choices_docstring': False, 44 '_get_subaction_names': False, 45 'is_pipe_registered': False, 46 'replace_pipes_in_dict': False, 47 'round_time': False, 48} 49 50 51def add_method_to_class( 52 func: Callable[[Any], Any], 53 class_def: 'Class', 54 method_name: Optional[str] = None, 55 keep_self: Optional[bool] = None, 56) -> Callable[[Any], Any]: 57 """ 58 Add function `func` to class `class_def`. 59 60 Parameters 61 ---------- 62 func: Callable[[Any], Any] 63 Function to be added as a method of the class 64 65 class_def: Class 66 Class to be modified. 67 68 method_name: Optional[str], default None 69 New name of the method. None will use func.__name__ (default). 70 71 Returns 72 ------- 73 The modified function object. 74 75 """ 76 from functools import wraps 77 78 is_class = isinstance(class_def, type) 79 80 @wraps(func) 81 def wrapper(self, *args, **kw): 82 return func(*args, **kw) 83 84 if method_name is None: 85 method_name = func.__name__ 86 87 setattr(class_def, method_name, ( 88 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 89 ) 90 ) 91 92 return func 93 94 95def generate_password(length: int = 12) -> str: 96 """ 97 Generate a secure password of given length. 98 99 Parameters 100 ---------- 101 length: int, default 12 102 The length of the password. 103 104 Returns 105 ------- 106 A random password string. 107 """ 108 import secrets 109 import string 110 return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length))) 111 112 113def is_int(s: str) -> bool: 114 """ 115 Check if string is an int. 116 117 Parameters 118 ---------- 119 s: str 120 The string to be checked. 121 122 Returns 123 ------- 124 A bool indicating whether the string was able to be cast to an integer. 125 126 """ 127 try: 128 return float(s).is_integer() 129 except Exception: 130 return False 131 132 133def is_uuid(s: str) -> bool: 134 """ 135 Check if a string is a valid UUID. 136 137 Parameters 138 ---------- 139 s: str 140 The string to be checked. 141 142 Returns 143 ------- 144 A bool indicating whether the string is a valid UUID. 145 """ 146 import uuid 147 try: 148 uuid.UUID(str(s)) 149 return True 150 except Exception: 151 return False 152 153 154def string_to_dict(params_string: str) -> Dict[str, Any]: 155 """ 156 Parse a string into a dictionary. 157 158 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 159 160 Parameters 161 ---------- 162 params_string: str 163 The string to be parsed. 164 165 Returns 166 ------- 167 The parsed dictionary. 168 169 Examples 170 -------- 171 >>> string_to_dict("a:1,b:2") 172 {'a': 1, 'b': 2} 173 >>> string_to_dict('{"a": 1, "b": 2}') 174 {'a': 1, 'b': 2} 175 176 """ 177 if not params_string: 178 return {} 179 180 import json 181 182 ### Kind of a weird edge case. 183 ### In the generated compose file, there is some weird escaping happening, 184 ### so the string to be parsed starts and ends with a single quote. 185 if ( 186 isinstance(params_string, str) 187 and len(params_string) > 4 188 and params_string[1] == "{" 189 and params_string[-2] == "}" 190 ): 191 return json.loads(params_string[1:-1]) 192 193 if str(params_string).startswith('{'): 194 return json.loads(params_string) 195 196 import ast 197 params_dict = {} 198 199 items = [] 200 bracket_level = 0 201 brace_level = 0 202 current_item = '' 203 in_quotes = False 204 quote_char = '' 205 206 i = 0 207 while i < len(params_string): 208 char = params_string[i] 209 210 if in_quotes: 211 if char == quote_char and (i == 0 or params_string[i-1] != '\\'): 212 in_quotes = False 213 else: 214 if char in ('"', "'"): 215 in_quotes = True 216 quote_char = char 217 elif char == '[': 218 bracket_level += 1 219 elif char == ']': 220 bracket_level -= 1 221 elif char == '{': 222 brace_level += 1 223 elif char == '}': 224 brace_level -= 1 225 elif char == ',' and bracket_level == 0 and brace_level == 0: 226 items.append(current_item) 227 current_item = '' 228 i += 1 229 continue 230 231 current_item += char 232 i += 1 233 234 if current_item: 235 items.append(current_item) 236 237 for param in items: 238 param = param.strip() 239 if not param: 240 continue 241 242 _keys = param.split(":", maxsplit=1) 243 if len(_keys) != 2: 244 continue 245 246 keys = _keys[:-1] 247 try: 248 val = ast.literal_eval(_keys[-1]) 249 except Exception: 250 val = str(_keys[-1]) 251 252 c = params_dict 253 for _k in keys[:-1]: 254 try: 255 k = ast.literal_eval(_k) 256 except Exception: 257 k = str(_k) 258 if k not in c: 259 c[k] = {} 260 c = c[k] 261 262 c[keys[-1]] = val 263 264 return params_dict 265 266 267def to_simple_dict(doc: Dict[str, Any]) -> str: 268 """ 269 Serialize a document dictionary in simple-dict format. 270 """ 271 import json 272 import ast 273 from meerschaum.utils.dtypes import json_serialize_value 274 275 def serialize_value(value): 276 if isinstance(value, str): 277 try: 278 evaluated = ast.literal_eval(value) 279 if not isinstance(evaluated, str): 280 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 281 return value 282 except (ValueError, SyntaxError, TypeError, MemoryError): 283 return value 284 285 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 286 287 return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items()) 288 289 290def parse_config_substitution( 291 value: str, 292 leading_key: str = 'MRSM', 293 begin_key: str = '{', 294 end_key: str = '}', 295 delimeter: str = ':', 296) -> List[Any]: 297 """ 298 Parse Meerschaum substitution syntax 299 E.g. MRSM{value1:value2} => ['value1', 'value2'] 300 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 301 """ 302 if not value.beginswith(leading_key): 303 return value 304 305 return leading_key[len(leading_key):][len():-1].split(delimeter) 306 307 308def edit_file( 309 path: Union[pathlib.Path, str], 310 default_editor: str = 'pyvim', 311 debug: bool = False 312) -> bool: 313 """ 314 Open a file for editing. 315 316 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 317 318 Parameters 319 ---------- 320 path: Union[pathlib.Path, str] 321 The path to the file to be edited. 322 323 default_editor: str, default 'pyvim' 324 If `$EDITOR` is not set, use this instead. 325 If `pyvim` is not installed, it will install it from PyPI. 326 327 debug: bool, default False 328 Verbosity toggle. 329 330 Returns 331 ------- 332 A bool indicating the file was successfully edited. 333 """ 334 from subprocess import call 335 from meerschaum.utils.debug import dprint 336 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 337 try: 338 EDITOR = os.environ.get('EDITOR', default_editor) 339 if debug: 340 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 341 rc = call([EDITOR, path]) 342 except Exception as e: ### can't open with default editors 343 if debug: 344 dprint(str(e)) 345 dprint('Failed to open file with system editor. Falling back to pyvim...') 346 pyvim = attempt_import('pyvim', lazy=False) 347 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 348 return rc == 0 349 350 351def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 352 """ 353 Determine the columns and lines in the terminal. 354 If they cannot be determined, return the default values (100 columns and 120 lines). 355 356 Parameters 357 ---------- 358 default_cols: int, default 100 359 If the columns cannot be determined, return this value. 360 361 default_lines: int, default 120 362 If the lines cannot be determined, return this value. 363 364 Returns 365 ------- 366 A tuple if integers for the columns and lines. 367 """ 368 try: 369 size = os.get_terminal_size() 370 _cols, _lines = size.columns, size.lines 371 except Exception: 372 _cols, _lines = ( 373 int(os.environ.get('COLUMNS', str(default_cols))), 374 int(os.environ.get('LINES', str(default_lines))), 375 ) 376 return _cols, _lines 377 378 379def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 380 """ 381 Iterate over a list in chunks. 382 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 383 384 Parameters 385 ---------- 386 iterable: Iterable[Any] 387 The iterable to iterate over in chunks. 388 389 chunksize: int 390 The size of chunks to iterate with. 391 392 fillvalue: Optional[Any], default None 393 If the chunks do not evenly divide into the iterable, pad the end with this value. 394 395 Returns 396 ------- 397 A generator of tuples of size `chunksize`. 398 399 """ 400 from itertools import zip_longest 401 args = [iter(iterable)] * chunksize 402 return zip_longest(*args, fillvalue=fillvalue) 403 404def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 405 """ 406 Sort a dictionary's values and return a new dictionary. 407 408 Parameters 409 ---------- 410 d: Dict[Any, Any] 411 The dictionary to be sorted. 412 413 Returns 414 ------- 415 A sorted dictionary. 416 417 Examples 418 -------- 419 >>> sorted_dict({'b': 1, 'a': 2}) 420 {'b': 1, 'a': 2} 421 >>> sorted_dict({'b': 2, 'a': 1}) 422 {'a': 1, 'b': 2} 423 424 """ 425 try: 426 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 427 except Exception: 428 return d 429 430def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 431 """ 432 Convert the standard pipes dictionary into a list. 433 434 Parameters 435 ---------- 436 pipes_dict: PipesDict 437 The pipes dictionary to be flattened. 438 439 Returns 440 ------- 441 A list of `Pipe` objects. 442 443 """ 444 pipes_list = [] 445 for ck in pipes_dict.values(): 446 for mk in ck.values(): 447 pipes_list += list(mk.values()) 448 return pipes_list 449 450 451def timed_input( 452 seconds: int = 10, 453 timeout_message: str = "", 454 prompt: str = "", 455 icon: bool = False, 456 **kw 457) -> Union[str, None]: 458 """ 459 Accept user input only for a brief period of time. 460 461 Parameters 462 ---------- 463 seconds: int, default 10 464 The number of seconds to wait. 465 466 timeout_message: str, default '' 467 The message to print after the window has elapsed. 468 469 prompt: str, default '' 470 The prompt to print during the window. 471 472 icon: bool, default False 473 If `True`, print the configured input icon. 474 475 476 Returns 477 ------- 478 The input string entered by the user. 479 480 """ 481 import signal, time 482 483 class TimeoutExpired(Exception): 484 """Raise this exception when the timeout is reached.""" 485 486 def alarm_handler(signum, frame): 487 raise TimeoutExpired 488 489 # set signal handler 490 signal.signal(signal.SIGALRM, alarm_handler) 491 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 492 493 try: 494 return input(prompt) 495 except TimeoutExpired: 496 return None 497 except (EOFError, RuntimeError): 498 try: 499 print(prompt) 500 time.sleep(seconds) 501 except TimeoutExpired: 502 return None 503 finally: 504 signal.alarm(0) # cancel alarm 505 506 507def enforce_gevent_monkey_patch(): 508 """ 509 Check if gevent monkey patching is enabled, and if not, then apply patching. 510 """ 511 from meerschaum.utils.packages import attempt_import 512 import socket 513 gevent, gevent_socket, gevent_monkey = attempt_import( 514 'gevent', 'gevent.socket', 'gevent.monkey' 515 ) 516 if not socket.socket is gevent_socket.socket: 517 gevent_monkey.patch_all() 518 519def is_valid_email(email: str) -> Union['re.Match', None]: 520 """ 521 Check whether a string is a valid email. 522 523 Parameters 524 ---------- 525 email: str 526 The string to be examined. 527 528 Returns 529 ------- 530 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 531 532 Examples 533 -------- 534 >>> is_valid_email('foo') 535 >>> is_valid_email('foo@foo.com') 536 <re.Match object; span=(0, 11), match='foo@foo.com'> 537 538 """ 539 import re 540 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 541 return re.search(regex, email) 542 543 544def string_width(string: str, widest: bool = True) -> int: 545 """ 546 Calculate the width of a string, either by its widest or last line. 547 548 Parameters 549 ---------- 550 string: str: 551 The string to be examined. 552 553 widest: bool, default True 554 No longer used because `widest` is always assumed to be true. 555 556 Returns 557 ------- 558 An integer for the text's visual width. 559 560 Examples 561 -------- 562 >>> string_width('a') 563 1 564 >>> string_width('a\\nbc\\nd') 565 2 566 567 """ 568 def _widest(): 569 words = string.split('\n') 570 max_length = 0 571 for w in words: 572 length = len(w) 573 if length > max_length: 574 max_length = length 575 return max_length 576 577 return _widest() 578 579def _pyinstaller_traverse_dir( 580 directory: str, 581 ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), 582 include_dotfiles: bool = False 583) -> list: 584 """ 585 Recursively traverse a directory and return a list of its contents. 586 """ 587 paths = [] 588 _directory = pathlib.Path(directory) 589 590 def _found_pattern(name: str): 591 for pattern in ignore_patterns: 592 if pattern.replace('/', os.path.sep) in str(name): 593 return True 594 return False 595 596 for root, dirs, files in os.walk(_directory): 597 _root = str(root)[len(str(_directory.parent)):] 598 if _root.startswith(os.path.sep): 599 _root = _root[len(os.path.sep):] 600 if _root.startswith('.') and not include_dotfiles: 601 continue 602 ### ignore certain patterns 603 if _found_pattern(_root): 604 continue 605 606 for filename in files: 607 if filename.startswith('.') and not include_dotfiles: 608 continue 609 path = os.path.join(root, filename) 610 if _found_pattern(path): 611 continue 612 613 _path = str(path)[len(str(_directory.parent)):] 614 if _path.startswith(os.path.sep): 615 _path = _path[len(os.path.sep):] 616 _path = os.path.sep.join(_path.split(os.path.sep)[:-1]) 617 618 paths.append((path, _path)) 619 return paths 620 621 622def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any: 623 """ 624 Get a value from a dictionary with a tuple of keys. 625 626 Parameters 627 ---------- 628 d: Dict[Any, Any] 629 The dictionary to search. 630 631 path: Tuple[Any, ...] 632 The path of keys to traverse. 633 634 Returns 635 ------- 636 The value from the end of the path. 637 """ 638 return functools.reduce(lambda di, key: di[key], path, d) 639 640 641def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None: 642 """ 643 Set a value in a dictionary with a tuple of keys. 644 645 Parameters 646 ---------- 647 d: Dict[Any, Any] 648 The dictionary to search. 649 650 path: Tuple[Any, ...] 651 The path of keys to traverse. 652 653 val: Any 654 The value to set at the end of the path. 655 """ 656 get_val_from_dict_path(d, path[:-1])[path[-1]] = val 657 658 659def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 660 """ 661 Recursively replace passwords in a dictionary. 662 663 Parameters 664 ---------- 665 d: Dict[str, Any] 666 The dictionary to search through. 667 668 replace_with: str, default '*' 669 The string to replace each character of the password with. 670 671 Returns 672 ------- 673 Another dictionary where values to the keys `'password'` 674 are replaced with `replace_with` (`'*'`). 675 676 Examples 677 -------- 678 >>> replace_password({'a': 1}) 679 {'a': 1} 680 >>> replace_password({'password': '123'}) 681 {'password': '***'} 682 >>> replace_password({'nested': {'password': '123'}}) 683 {'nested': {'password': '***'}} 684 >>> replace_password({'password': '123'}, replace_with='!') 685 {'password': '!!!'} 686 687 """ 688 import copy 689 _d = copy.deepcopy(d) 690 for k, v in d.items(): 691 if isinstance(v, dict): 692 _d[k] = replace_password(v) 693 elif 'password' in str(k).lower(): 694 _d[k] = ''.join([replace_with for char in str(v)]) 695 elif str(k).lower() == 'uri': 696 from meerschaum.connectors.sql import SQLConnector 697 try: 698 uri_params = SQLConnector.parse_uri(v) 699 except Exception: 700 uri_params = None 701 if not uri_params: 702 continue 703 if not 'username' in uri_params or not 'password' in uri_params: 704 continue 705 _d[k] = v.replace( 706 uri_params['username'] + ':' + uri_params['password'], 707 uri_params['username'] + ':' + ''.join( 708 [replace_with for char in str(uri_params['password'])] 709 ) 710 ) 711 return _d 712 713 714def filter_arguments( 715 func: Callable[[Any], Any], 716 *args: Any, 717 **kwargs: Any 718) -> Tuple[Tuple[Any], Dict[str, Any]]: 719 """ 720 Filter out unsupported positional and keyword arguments. 721 722 Parameters 723 ---------- 724 func: Callable[[Any], Any] 725 The function to inspect. 726 727 *args: Any 728 Positional arguments to filter and pass to `func`. 729 730 **kwargs 731 Keyword arguments to filter and pass to `func`. 732 733 Returns 734 ------- 735 The `args` and `kwargs` accepted by `func`. 736 """ 737 args = filter_positionals(func, *args) 738 kwargs = filter_keywords(func, **kwargs) 739 return args, kwargs 740 741 742def filter_keywords( 743 func: Callable[[Any], Any], 744 **kw: Any 745) -> Dict[str, Any]: 746 """ 747 Filter out unsupported keyword arguments. 748 749 Parameters 750 ---------- 751 func: Callable[[Any], Any] 752 The function to inspect. 753 754 **kw: Any 755 The arguments to be filtered and passed into `func`. 756 757 Returns 758 ------- 759 A dictionary of keyword arguments accepted by `func`. 760 761 Examples 762 -------- 763 ```python 764 >>> def foo(a=1, b=2): 765 ... return a * b 766 >>> filter_keywords(foo, a=2, b=4, c=6) 767 {'a': 2, 'b': 4} 768 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 769 8 770 ``` 771 772 """ 773 import inspect 774 func_params = inspect.signature(func).parameters 775 ### If the function has a **kw method, skip filtering. 776 for param, _type in func_params.items(): 777 if '**' in str(_type): 778 return kw 779 return {k: v for k, v in kw.items() if k in func_params} 780 781 782def filter_positionals( 783 func: Callable[[Any], Any], 784 *args: Any 785) -> Tuple[Any]: 786 """ 787 Filter out unsupported positional arguments. 788 789 Parameters 790 ---------- 791 func: Callable[[Any], Any] 792 The function to inspect. 793 794 *args: Any 795 The arguments to be filtered and passed into `func`. 796 NOTE: If the function signature expects more arguments than provided, 797 the missing slots will be filled with `None`. 798 799 Returns 800 ------- 801 A tuple of positional arguments accepted by `func`. 802 803 Examples 804 -------- 805 ```python 806 >>> def foo(a, b): 807 ... return a * b 808 >>> filter_positionals(foo, 2, 4, 6) 809 (2, 4) 810 >>> foo(*filter_positionals(foo, 2, 4, 6)) 811 8 812 ``` 813 814 """ 815 import inspect 816 from meerschaum.utils.warnings import warn 817 func_params = inspect.signature(func).parameters 818 acceptable_args: List[Any] = [] 819 820 def _warn_invalids(_num_invalid): 821 if _num_invalid > 0: 822 warn( 823 "Too few arguments were provided. " 824 + f"{_num_invalid} argument" 825 + ('s have ' if _num_invalid != 1 else " has ") 826 + " been filled with `None`.", 827 ) 828 829 num_invalid: int = 0 830 for i, (param, val) in enumerate(func_params.items()): 831 if '=' in str(val) or '*' in str(val): 832 _warn_invalids(num_invalid) 833 return tuple(acceptable_args) 834 835 try: 836 acceptable_args.append(args[i]) 837 except IndexError: 838 acceptable_args.append(None) 839 num_invalid += 1 840 841 _warn_invalids(num_invalid) 842 return tuple(acceptable_args) 843 844 845def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 846 """ 847 Convert an ordered dict to a dict. 848 Does not mutate the original OrderedDict. 849 """ 850 from collections import OrderedDict 851 _d = dict(od) 852 for k, v in od.items(): 853 if isinstance(v, OrderedDict) or ( 854 issubclass(type(v), OrderedDict) 855 ): 856 _d[k] = dict_from_od(v) 857 return _d 858 859 860def remove_ansi(s: str) -> str: 861 """ 862 Remove ANSI escape characters from a string. 863 864 Parameters 865 ---------- 866 s: str: 867 The string to be cleaned. 868 869 Returns 870 ------- 871 A string with the ANSI characters removed. 872 873 Examples 874 -------- 875 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 876 'Hello, World!' 877 878 """ 879 import re 880 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s) 881 882 883def get_connector_labels( 884 *types: str, 885 search_term: str = '', 886 ignore_exact_match = True, 887 _additional_options: Optional[List[str]] = None, 888) -> List[str]: 889 """ 890 Read connector labels from the configuration dictionary. 891 892 Parameters 893 ---------- 894 *types: str 895 The connector types. 896 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 897 898 search_term: str, default '' 899 A filter on the connectors' labels. 900 901 ignore_exact_match: bool, default True 902 If `True`, skip a connector if the search_term is an exact match. 903 904 Returns 905 ------- 906 A list of the keys of defined connectors. 907 908 """ 909 from meerschaum.config import get_config 910 connectors = get_config('meerschaum', 'connectors') 911 912 _types = list(types) 913 if len(_types) == 0: 914 _types = list(connectors.keys()) + ['plugin'] 915 916 conns = [] 917 for t in _types: 918 if t == 'plugin': 919 from meerschaum.plugins import get_data_plugins 920 conns += [ 921 f'{t}:' + plugin.module.__name__.split('.')[-1] 922 for plugin in get_data_plugins() 923 ] 924 continue 925 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 926 927 if _additional_options: 928 conns += _additional_options 929 930 possibilities = [ 931 c 932 for c in conns 933 if c.startswith(search_term) 934 and c != ( 935 search_term if ignore_exact_match else '' 936 ) 937 ] 938 return sorted(possibilities) 939 940 941def wget( 942 url: str, 943 dest: Optional[Union[str, 'pathlib.Path']] = None, 944 headers: Optional[Dict[str, Any]] = None, 945 color: bool = True, 946 debug: bool = False, 947 **kw: Any 948) -> 'pathlib.Path': 949 """ 950 Mimic `wget` with `requests`. 951 952 Parameters 953 ---------- 954 url: str 955 The URL to the resource to be downloaded. 956 957 dest: Optional[Union[str, pathlib.Path]], default None 958 The destination path of the downloaded file. 959 If `None`, save to the current directory. 960 961 color: bool, default True 962 If `debug` is `True`, print color output. 963 964 debug: bool, default False 965 Verbosity toggle. 966 967 Returns 968 ------- 969 The path to the downloaded file. 970 971 """ 972 from meerschaum.utils.warnings import warn, error 973 from meerschaum.utils.debug import dprint 974 import re, urllib.request 975 if headers is None: 976 headers = {} 977 request = urllib.request.Request(url, headers=headers) 978 if not color: 979 dprint = print 980 if debug: 981 dprint(f"Downloading from '{url}'...") 982 try: 983 response = urllib.request.urlopen(request) 984 except Exception as e: 985 import ssl 986 ssl._create_default_https_context = ssl._create_unverified_context 987 try: 988 response = urllib.request.urlopen(request) 989 except Exception as _e: 990 print(_e) 991 response = None 992 if response is None or response.code != 200: 993 error_msg = f"Failed to download from '{url}'." 994 if color: 995 error(error_msg) 996 else: 997 print(error_msg) 998 import sys 999 sys.exit(1) 1000 1001 d = response.headers.get('content-disposition', None) 1002 fname = ( 1003 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1004 else url.split('/')[-1] 1005 ) 1006 1007 if dest is None: 1008 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1009 elif isinstance(dest, str): 1010 dest = pathlib.Path(dest) 1011 1012 with open(dest, 'wb') as f: 1013 f.write(response.fp.read()) 1014 1015 if debug: 1016 dprint(f"Downloaded file '{dest}'.") 1017 1018 return dest 1019 1020 1021def async_wrap(func): 1022 """ 1023 Run a synchronous function as async. 1024 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1025 """ 1026 import asyncio 1027 from functools import wraps, partial 1028 1029 @wraps(func) 1030 async def run(*args, loop=None, executor=None, **kwargs): 1031 if loop is None: 1032 loop = asyncio.get_event_loop() 1033 pfunc = partial(func, *args, **kwargs) 1034 return await loop.run_in_executor(executor, pfunc) 1035 return run 1036 1037 1038def debug_trace(browser: bool = True): 1039 """ 1040 Open a web-based debugger to trace the execution of the program. 1041 1042 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1043 """ 1044 from meerschaum.utils.debug import trace 1045 trace(browser=browser) 1046 1047 1048def items_str( 1049 items: List[Any], 1050 quotes: bool = True, 1051 quote_str: str = "'", 1052 commas: bool = True, 1053 comma_str: str = ',', 1054 and_: bool = True, 1055 and_str: str = 'and', 1056 oxford_comma: bool = True, 1057 spaces: bool = True, 1058 space_str = ' ', 1059) -> str: 1060 """ 1061 Return a formatted string if list items separated by commas. 1062 1063 Parameters 1064 ---------- 1065 items: [List[Any]] 1066 The items to be printed as an English list. 1067 1068 quotes: bool, default True 1069 If `True`, wrap items in quotes. 1070 1071 quote_str: str, default "'" 1072 If `quotes` is `True`, prepend and append each item with this string. 1073 1074 and_: bool, default True 1075 If `True`, include the word 'and' before the final item in the list. 1076 1077 and_str: str, default 'and' 1078 If `and_` is True, insert this string where 'and' normally would in and English list. 1079 1080 oxford_comma: bool, default True 1081 If `True`, include the Oxford Comma (comma before the final 'and'). 1082 Only applies when `and_` is `True`. 1083 1084 spaces: bool, default True 1085 If `True`, separate items with `space_str` 1086 1087 space_str: str, default ' ' 1088 If `spaces` is `True`, separate items with this string. 1089 1090 Returns 1091 ------- 1092 A string of the items as an English list. 1093 1094 Examples 1095 -------- 1096 >>> items_str([1,2,3]) 1097 "'1', '2', and '3'" 1098 >>> items_str([1,2,3], quotes=False) 1099 '1, 2, and 3' 1100 >>> items_str([1,2,3], and_=False) 1101 "'1', '2', '3'" 1102 >>> items_str([1,2,3], spaces=False, and_=False) 1103 "'1','2','3'" 1104 >>> items_str([1,2,3], oxford_comma=False) 1105 "'1', '2' and '3'" 1106 >>> items_str([1,2,3], quote_str=":") 1107 ':1:, :2:, and :3:' 1108 >>> items_str([1,2,3], and_str="or") 1109 "'1', '2', or '3'" 1110 >>> items_str([1,2,3], space_str="_") 1111 "'1',_'2',_and_'3'" 1112 1113 """ 1114 if not items: 1115 return '' 1116 1117 q = quote_str if quotes else '' 1118 s = space_str if spaces else '' 1119 a = and_str if and_ else '' 1120 c = comma_str if commas else '' 1121 1122 if len(items) == 1: 1123 return q + str(list(items)[0]) + q 1124 1125 if len(items) == 2: 1126 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1127 1128 sep = q + c + s + q 1129 output = q + sep.join(str(i) for i in items[:-1]) + q 1130 if oxford_comma: 1131 output += c 1132 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1133 return output 1134 1135 1136def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str: 1137 """ 1138 Return a human-readable string for a `timedelta` (or `int` minutes). 1139 1140 Parameters 1141 ---------- 1142 delta: Union[timedelta, int] 1143 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1144 1145 round_unit: bool, default False 1146 If `True`, round the output to a single unit. 1147 1148 Returns 1149 ------- 1150 A formatted string, fit for human eyes. 1151 """ 1152 from meerschaum.utils.packages import attempt_import 1153 if is_int(str(delta)) and not round_unit: 1154 return str(delta) 1155 1156 humanfriendly = attempt_import('humanfriendly', lazy=False) 1157 delta_seconds = ( 1158 delta.total_seconds() 1159 if hasattr(delta, 'total_seconds') 1160 else (delta * 60) 1161 ) 1162 1163 is_negative = delta_seconds < 0 1164 delta_seconds = abs(delta_seconds) 1165 replace_units = {} 1166 1167 if round_unit: 1168 if delta_seconds < 1: 1169 delta_seconds = round(delta_seconds, 2) 1170 elif delta_seconds < 60: 1171 delta_seconds = int(delta_seconds) 1172 elif delta_seconds < 3600: 1173 delta_seconds = int(delta_seconds / 60) * 60 1174 elif delta_seconds < 86400: 1175 delta_seconds = int(delta_seconds / 3600) * 3600 1176 elif delta_seconds < (86400 * 7): 1177 delta_seconds = int(delta_seconds / 86400) * 86400 1178 elif delta_seconds < (86400 * 7 * 4): 1179 delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7) 1180 elif delta_seconds < (86400 * 7 * 4 * 13): 1181 delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7) 1182 replace_units['weeks'] = 'months' 1183 else: 1184 delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364) 1185 1186 delta_str = humanfriendly.format_timespan(delta_seconds) 1187 if ',' in delta_str and round_unit: 1188 delta_str = delta_str.split(',')[0] 1189 elif ' and ' in delta_str and round_unit: 1190 delta_str = delta_str.split(' and ')[0] 1191 1192 for parsed_unit, replacement_unit in replace_units.items(): 1193 delta_str = delta_str.replace(parsed_unit, replacement_unit) 1194 1195 return delta_str + (' ago' if is_negative else '') 1196 1197 1198def is_docker_available() -> bool: 1199 """Check if we can connect to the Docker engine.""" 1200 import subprocess 1201 try: 1202 has_docker = subprocess.call( 1203 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1204 ) == 0 1205 except Exception: 1206 has_docker = False 1207 return has_docker 1208 1209 1210def is_android() -> bool: 1211 """Return `True` if the current platform is Android.""" 1212 import sys 1213 return hasattr(sys, 'getandroidapilevel') 1214 1215 1216def is_bcp_available() -> bool: 1217 """Check if the MSSQL `bcp` utility is installed.""" 1218 import subprocess 1219 1220 try: 1221 has_bcp = subprocess.call( 1222 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1223 ) == 0 1224 except Exception: 1225 has_bcp = False 1226 return has_bcp 1227 1228 1229def is_systemd_available() -> bool: 1230 """Check if running on systemd.""" 1231 import subprocess 1232 try: 1233 has_systemctl = subprocess.call( 1234 ['systemctl', 'whoami'], 1235 stdout=subprocess.DEVNULL, 1236 stderr=subprocess.STDOUT, 1237 ) == 0 1238 except FileNotFoundError: 1239 has_systemctl = False 1240 except Exception: 1241 import traceback 1242 traceback.print_exc() 1243 has_systemctl = False 1244 return has_systemctl 1245 1246 1247def is_tmux_available() -> bool: 1248 """ 1249 Check if `tmux` is installed. 1250 """ 1251 import subprocess 1252 try: 1253 has_tmux = subprocess.call( 1254 ['tmux', '-V'], 1255 stdout=subprocess.DEVNULL, 1256 stderr=subprocess.STDOUT 1257 ) == 0 1258 except FileNotFoundError: 1259 has_tmux = False 1260 except Exception: 1261 has_tmux = False 1262 return has_tmux 1263 1264def get_last_n_lines(file_name: str, N: int): 1265 """ 1266 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1267 """ 1268 # Create an empty list to keep the track of last N lines 1269 list_of_lines = [] 1270 # Open file for reading in binary mode 1271 with open(file_name, 'rb') as read_obj: 1272 # Move the cursor to the end of the file 1273 read_obj.seek(0, os.SEEK_END) 1274 # Create a buffer to keep the last read line 1275 buffer = bytearray() 1276 # Get the current position of pointer i.e eof 1277 pointer_location = read_obj.tell() 1278 # Loop till pointer reaches the top of the file 1279 while pointer_location >= 0: 1280 # Move the file pointer to the location pointed by pointer_location 1281 read_obj.seek(pointer_location) 1282 # Shift pointer location by -1 1283 pointer_location = pointer_location -1 1284 # read that byte / character 1285 new_byte = read_obj.read(1) 1286 # If the read byte is new line character then it means one line is read 1287 if new_byte == b'\n': 1288 # Save the line in list of lines 1289 list_of_lines.append(buffer.decode()[::-1]) 1290 # If the size of list reaches N, then return the reversed list 1291 if len(list_of_lines) == N: 1292 return list(reversed(list_of_lines)) 1293 # Reinitialize the byte array to save next line 1294 buffer = bytearray() 1295 else: 1296 # If last read character is not eol then add it in buffer 1297 buffer.extend(new_byte) 1298 # As file is read completely, if there is still data in buffer, then its first line. 1299 if len(buffer) > 0: 1300 list_of_lines.append(buffer.decode()[::-1]) 1301 # return the reversed list 1302 return list(reversed(list_of_lines)) 1303 1304 1305def tail(f, n, offset=None): 1306 """ 1307 https://stackoverflow.com/a/692616/9699829 1308 1309 Reads n lines from f with an offset of offset lines. The return 1310 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1311 an indicator that is `True` if there are more lines in the file. 1312 """ 1313 avg_line_length = 74 1314 to_read = n + (offset or 0) 1315 1316 while True: 1317 try: 1318 f.seek(-(avg_line_length * to_read), 2) 1319 except IOError: 1320 # woops. apparently file is smaller than what we want 1321 # to step back, go to the beginning instead 1322 f.seek(0) 1323 pos = f.tell() 1324 lines = f.read().splitlines() 1325 if len(lines) >= to_read or pos == 0: 1326 return lines[-to_read:offset and -offset or None], \ 1327 len(lines) > to_read or pos > 0 1328 avg_line_length *= 1.3 1329 1330 1331def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1332 """ 1333 Remove characters from each section of a string until the length is within the limit. 1334 1335 Parameters 1336 ---------- 1337 item: str 1338 The item name to be truncated. 1339 1340 delimeter: str, default '_' 1341 Split `item` by this string into several sections. 1342 1343 max_len: int, default 128 1344 The max acceptable length of the truncated version of `item`. 1345 1346 Returns 1347 ------- 1348 The truncated string. 1349 1350 Examples 1351 -------- 1352 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1353 'ab_de_gh' 1354 1355 """ 1356 if len(item) < max_len: 1357 return item 1358 1359 def _shorten(s: str) -> str: 1360 return s[:-1] if len(s) > 1 else s 1361 1362 sections = list(enumerate(item.split('_'))) 1363 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1364 available_chars = max_len - len(sections) 1365 1366 _sections = [(i, s) for i, s in sorted_sections] 1367 _sections_len = sum([len(s) for i, s in _sections]) 1368 _old_sections_len = _sections_len 1369 while _sections_len > available_chars: 1370 _sections = [(i, _shorten(s)) for i, s in _sections] 1371 _old_sections_len = _sections_len 1372 _sections_len = sum([len(s) for i, s in _sections]) 1373 if _old_sections_len == _sections_len: 1374 raise Exception(f"String could not be truncated: '{item}'") 1375 1376 new_sections = sorted(_sections, key=lambda x: x[0]) 1377 return delimeter.join([s for i, s in new_sections]) 1378 1379 1380def truncate_text_for_display( 1381 text: str, 1382 max_length: int = 50, 1383 suffix: str = '…', 1384) -> str: 1385 """ 1386 Truncate a potentially long string for display purposes. 1387 1388 Parameters 1389 ---------- 1390 text: str 1391 The string to be truncated. 1392 1393 max_length: int, default 60 1394 The maximum length of `text` before truncation. 1395 1396 suffix: str, default '…' 1397 The string to append to the length of `text` to indicate truncation. 1398 1399 Returns 1400 ------- 1401 A string of length `max_length` or less. 1402 """ 1403 text_length = len(text) 1404 if text_length <= max_length: 1405 return text 1406 1407 suffix_length = len(suffix) 1408 1409 truncated_text = text[:max_length - suffix_length] 1410 return truncated_text + suffix 1411 1412 1413def separate_negation_values( 1414 vals: Union[List[str], Tuple[str]], 1415 negation_prefix: Optional[str] = None, 1416) -> Tuple[List[str], List[str]]: 1417 """ 1418 Separate the negated values from the positive ones. 1419 Return two lists: positive and negative values. 1420 1421 Parameters 1422 ---------- 1423 vals: Union[List[str], Tuple[str]] 1424 A list of strings to parse. 1425 1426 negation_prefix: Optional[str], default None 1427 Include values that start with this string in the second list. 1428 If `None`, use the system default (`_`). 1429 """ 1430 if negation_prefix is None: 1431 from meerschaum._internal.static import STATIC_CONFIG 1432 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1433 _in_vals, _ex_vals = [], [] 1434 for v in vals: 1435 if str(v).startswith(negation_prefix): 1436 _ex_vals.append(str(v)[len(negation_prefix):]) 1437 else: 1438 _in_vals.append(v) 1439 1440 return _in_vals, _ex_vals 1441 1442 1443def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1444 """ 1445 Translate a params dictionary into lists of include- and exclude-values. 1446 1447 Parameters 1448 ---------- 1449 params: Optional[Dict[str, Any]] 1450 A params query dictionary. 1451 1452 Returns 1453 ------- 1454 A dictionary mapping keys to a tuple of lists for include and exclude values. 1455 1456 Examples 1457 -------- 1458 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1459 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1460 """ 1461 if not params: 1462 return {} 1463 return { 1464 col: separate_negation_values( 1465 ( 1466 val 1467 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype') 1468 else [val] 1469 ) 1470 ) 1471 for col, val in params.items() 1472 } 1473 1474 1475def flatten_list(list_: List[Any]) -> List[Any]: 1476 """ 1477 Recursively flatten a list. 1478 """ 1479 for item in list_: 1480 if isinstance(item, list): 1481 yield from flatten_list(item) 1482 else: 1483 yield item 1484 1485 1486def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1487 """ 1488 Parse a string containing the text to be passed into a function 1489 and return a tuple of args, kwargs. 1490 1491 Parameters 1492 ---------- 1493 args_str: str 1494 The contents of the function parameter (as a string). 1495 1496 Returns 1497 ------- 1498 A tuple of args (tuple) and kwargs (dict[str, Any]). 1499 1500 Examples 1501 -------- 1502 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1503 (123, 456), {'foo': 789, 'bar': 'baz'} 1504 """ 1505 import ast 1506 args = [] 1507 kwargs = {} 1508 1509 for part in args_str.split(','): 1510 if '=' in part: 1511 key, val = part.split('=', 1) 1512 kwargs[key.strip()] = ast.literal_eval(val) 1513 else: 1514 args.append(ast.literal_eval(part.strip())) 1515 1516 return tuple(args), kwargs 1517 1518 1519def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1520 """ 1521 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1522 1523 Parameters 1524 ---------- 1525 src_path: pathlib.Path 1526 The source path. 1527 1528 dest_path: pathlib.Path 1529 The destination path. 1530 1531 Returns 1532 ------- 1533 A SuccessTuple indicating success. 1534 """ 1535 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1536 return True, "Symlink already exists." 1537 try: 1538 dest_path.symlink_to(src_path) 1539 success = True 1540 except Exception as e: 1541 success = False 1542 msg = str(e) 1543 if success: 1544 return success, "Success" 1545 1546 ### Failed to create a symlink. 1547 ### If we're not on Windows, return an error. 1548 import platform 1549 if platform.system() != 'Windows': 1550 return success, msg 1551 1552 try: 1553 import _winapi 1554 except ImportError: 1555 return False, "Unable to import _winapi." 1556 1557 if src_path.is_dir(): 1558 try: 1559 _winapi.CreateJunction(str(src_path), str(dest_path)) 1560 except Exception as e: 1561 return False, str(e) 1562 return True, "Success" 1563 1564 ### Last resort: copy the file on Windows. 1565 import shutil 1566 try: 1567 shutil.copy(src_path, dest_path) 1568 except Exception as e: 1569 return False, str(e) 1570 1571 return True, "Success" 1572 1573 1574def is_symlink(path: pathlib.Path) -> bool: 1575 """ 1576 Wrap `path.is_symlink()` but add support for Windows junctions. 1577 """ 1578 if path.is_symlink(): 1579 return True 1580 1581 import platform 1582 if platform.system() != 'Windows': 1583 return False 1584 try: 1585 return bool(os.readlink(path)) 1586 except OSError: 1587 return False 1588 1589 1590def parametrized(dec): 1591 """ 1592 A meta-decorator for allowing other decorator functions to have parameters. 1593 1594 https://stackoverflow.com/a/26151604/9699829 1595 """ 1596 def layer(*args, **kwargs): 1597 def repl(f): 1598 return dec(f, *args, **kwargs) 1599 return repl 1600 return layer 1601 1602 1603def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1604 """ 1605 Safely extract a TAR file to a give directory. 1606 This defends against CVE-2007-4559. 1607 1608 Parameters 1609 ---------- 1610 tarf: file 1611 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1612 1613 output_dir: Union[str, pathlib.Path] 1614 The output directory. 1615 """ 1616 1617 def is_within_directory(directory, target): 1618 abs_directory = os.path.abspath(directory) 1619 abs_target = os.path.abspath(target) 1620 prefix = os.path.commonprefix([abs_directory, abs_target]) 1621 return prefix == abs_directory 1622 1623 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1624 for member in tar.getmembers(): 1625 member_path = os.path.join(path, member.name) 1626 if not is_within_directory(path, member_path): 1627 raise Exception("Attempted Path Traversal in Tar File") 1628 1629 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1630 1631 return safe_extract(tarf, output_dir) 1632 1633 1634def to_snake_case(name: str) -> str: 1635 """ 1636 Return the given string in snake-case-style. 1637 1638 Parameters 1639 ---------- 1640 name: str 1641 The input text to convert to snake case. 1642 1643 Returns 1644 ------- 1645 A snake-case version of `name`. 1646 1647 Examples 1648 -------- 1649 >>> to_snake_case("HelloWorld!") 1650 'hello_world' 1651 >>> to_snake_case("This has spaces in it.") 1652 'this_has_spaces_in_it' 1653 >>> to_snake_case("already_in_snake_case") 1654 'already_in_snake_case' 1655 """ 1656 import re 1657 name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name) 1658 name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name) 1659 name = re.sub(r'[^\w\s]', '', name) 1660 name = re.sub(r'\s+', '_', name) 1661 return name.lower() 1662 1663 1664def get_directory_size(path: Path) -> int: 1665 """ 1666 Return the cumulative size of a directory's files in bytes. 1667 https://stackoverflow.com/a/55659577/9699829 1668 """ 1669 return sum(file.stat().st_size for file in path.rglob('*')) 1670 1671 1672################## 1673# Legacy imports # 1674################## 1675 1676def choose_subaction(*args, **kwargs) -> Any: 1677 """ 1678 Placeholder function to prevent breaking legacy behavior. 1679 See `meerschaum.actions.choose_subaction`. 1680 """ 1681 from meerschaum.actions import choose_subaction as _choose_subactions 1682 return _choose_subactions(*args, **kwargs) 1683 1684 1685def print_options(*args, **kwargs) -> None: 1686 """ 1687 Placeholder function to prevent breaking legacy behavior. 1688 See `meerschaum.utils.formatting.print_options`. 1689 """ 1690 from meerschaum.utils.formatting import print_options as _print_options 1691 return _print_options(*args, **kwargs) 1692 1693 1694def to_pandas_dtype(*args, **kwargs) -> Any: 1695 """ 1696 Placeholder function to prevent breaking legacy behavior. 1697 See `meerschaum.utils.dtypes.to_pandas_dtype`. 1698 """ 1699 from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype 1700 return _to_pandas_dtype(*args, **kwargs) 1701 1702 1703def filter_unseen_df(*args, **kwargs) -> Any: 1704 """ 1705 Placeholder function to prevent breaking legacy behavior. 1706 See `meerschaum.utils.dataframe.filter_unseen_df`. 1707 """ 1708 from meerschaum.utils.dataframe import filter_unseen_df as real_function 1709 return real_function(*args, **kwargs) 1710 1711 1712def add_missing_cols_to_df(*args, **kwargs) -> Any: 1713 """ 1714 Placeholder function to prevent breaking legacy behavior. 1715 See `meerschaum.utils.dataframe.add_missing_cols_to_df`. 1716 """ 1717 from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function 1718 return real_function(*args, **kwargs) 1719 1720 1721def parse_df_datetimes(*args, **kwargs) -> Any: 1722 """ 1723 Placeholder function to prevent breaking legacy behavior. 1724 See `meerschaum.utils.dataframe.parse_df_datetimes`. 1725 """ 1726 from meerschaum.utils.dataframe import parse_df_datetimes as real_function 1727 return real_function(*args, **kwargs) 1728 1729 1730def df_from_literal(*args, **kwargs) -> Any: 1731 """ 1732 Placeholder function to prevent breaking legacy behavior. 1733 See `meerschaum.utils.dataframe.df_from_literal`. 1734 """ 1735 from meerschaum.utils.dataframe import df_from_literal as real_function 1736 return real_function(*args, **kwargs) 1737 1738 1739def get_json_cols(*args, **kwargs) -> Any: 1740 """ 1741 Placeholder function to prevent breaking legacy behavior. 1742 See `meerschaum.utils.dataframe.get_json_cols`. 1743 """ 1744 from meerschaum.utils.dataframe import get_json_cols as real_function 1745 return real_function(*args, **kwargs) 1746 1747 1748def get_unhashable_cols(*args, **kwargs) -> Any: 1749 """ 1750 Placeholder function to prevent breaking legacy behavior. 1751 See `meerschaum.utils.dataframe.get_unhashable_cols`. 1752 """ 1753 from meerschaum.utils.dataframe import get_unhashable_cols as real_function 1754 return real_function(*args, **kwargs) 1755 1756 1757def enforce_dtypes(*args, **kwargs) -> Any: 1758 """ 1759 Placeholder function to prevent breaking legacy behavior. 1760 See `meerschaum.utils.dataframe.enforce_dtypes`. 1761 """ 1762 from meerschaum.utils.dataframe import enforce_dtypes as real_function 1763 return real_function(*args, **kwargs) 1764 1765 1766def get_datetime_bound_from_df(*args, **kwargs) -> Any: 1767 """ 1768 Placeholder function to prevent breaking legacy behavior. 1769 See `meerschaum.utils.dataframe.get_datetime_bound_from_df`. 1770 """ 1771 from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function 1772 return real_function(*args, **kwargs) 1773 1774 1775def df_is_chunk_generator(*args, **kwargs) -> Any: 1776 """ 1777 Placeholder function to prevent breaking legacy behavior. 1778 See `meerschaum.utils.dataframe.df_is_chunk_generator`. 1779 """ 1780 from meerschaum.utils.dataframe import df_is_chunk_generator as real_function 1781 return real_function(*args, **kwargs) 1782 1783 1784def choices_docstring(*args, **kwargs) -> Any: 1785 """ 1786 Placeholder function to prevent breaking legacy behavior. 1787 See `meerschaum.actions.choices_docstring`. 1788 """ 1789 from meerschaum.actions import choices_docstring as real_function 1790 return real_function(*args, **kwargs) 1791 1792 1793def _get_subaction_names(*args, **kwargs) -> Any: 1794 """ 1795 Placeholder function to prevent breaking legacy behavior. 1796 See `meerschaum.actions._get_subaction_names`. 1797 """ 1798 from meerschaum.actions import _get_subaction_names as real_function 1799 return real_function(*args, **kwargs) 1800 1801 1802def json_serialize_datetime(dt: datetime) -> Union[str, None]: 1803 """ 1804 Serialize a datetime object into JSON (ISO format string). 1805 1806 Examples 1807 -------- 1808 >>> import json 1809 >>> from datetime import datetime 1810 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 1811 '{"a": "2022-01-01T00:00:00Z"}' 1812 1813 """ 1814 from meerschaum.utils.dtypes import serialize_datetime 1815 return serialize_datetime(dt) 1816 1817 1818def replace_pipes_in_dict(*args, **kwargs): 1819 """ 1820 Placeholder function to prevent breaking legacy behavior. 1821 See `meerschaum.utils.pipes.replace_pipes_in_dict`. 1822 """ 1823 from meerschaum.utils.pipes import replace_pipes_in_dict 1824 return replace_pipes_in_dict(*args, **kwargs) 1825 1826 1827def is_pipe_registered(*args, **kwargs): 1828 """ 1829 Placeholder function to prevent breaking legacy behavior. 1830 See `meerschaum.utils.pipes.is_pipe_registered`. 1831 """ 1832 from meerschaum.utils.pipes import is_pipe_registered 1833 return is_pipe_registered(*args, **kwargs) 1834 1835 1836def round_time(*args, **kwargs): 1837 """ 1838 Placeholder function to prevent breaking legacy behavior. 1839 See `meerschaum.utils.dtypes.round_time`. 1840 """ 1841 from meerschaum.utils.dtypes import round_time 1842 return round_time(*args, **kwargs) 1843 1844 1845_current_module = sys.modules[__name__] 1846__all__ = tuple( 1847 name 1848 for name, obj in globals().items() 1849 if callable(obj) 1850 and name not in __pdoc__ 1851 and getattr(obj, '__module__', None) == _current_module.__name__ 1852 and not name.startswith('_') 1853)
52def add_method_to_class( 53 func: Callable[[Any], Any], 54 class_def: 'Class', 55 method_name: Optional[str] = None, 56 keep_self: Optional[bool] = None, 57) -> Callable[[Any], Any]: 58 """ 59 Add function `func` to class `class_def`. 60 61 Parameters 62 ---------- 63 func: Callable[[Any], Any] 64 Function to be added as a method of the class 65 66 class_def: Class 67 Class to be modified. 68 69 method_name: Optional[str], default None 70 New name of the method. None will use func.__name__ (default). 71 72 Returns 73 ------- 74 The modified function object. 75 76 """ 77 from functools import wraps 78 79 is_class = isinstance(class_def, type) 80 81 @wraps(func) 82 def wrapper(self, *args, **kw): 83 return func(*args, **kw) 84 85 if method_name is None: 86 method_name = func.__name__ 87 88 setattr(class_def, method_name, ( 89 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 90 ) 91 ) 92 93 return func
Add function func to class class_def.
Parameters
- func (Callable[[Any], Any]): Function to be added as a method of the class
- class_def (Class): Class to be modified.
- method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
- The modified function object.
96def generate_password(length: int = 12) -> str: 97 """ 98 Generate a secure password of given length. 99 100 Parameters 101 ---------- 102 length: int, default 12 103 The length of the password. 104 105 Returns 106 ------- 107 A random password string. 108 """ 109 import secrets 110 import string 111 return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))
Generate a secure password of given length.
Parameters
- length (int, default 12): The length of the password.
Returns
- A random password string.
114def is_int(s: str) -> bool: 115 """ 116 Check if string is an int. 117 118 Parameters 119 ---------- 120 s: str 121 The string to be checked. 122 123 Returns 124 ------- 125 A bool indicating whether the string was able to be cast to an integer. 126 127 """ 128 try: 129 return float(s).is_integer() 130 except Exception: 131 return False
Check if string is an int.
Parameters
- s (str): The string to be checked.
Returns
- A bool indicating whether the string was able to be cast to an integer.
134def is_uuid(s: str) -> bool: 135 """ 136 Check if a string is a valid UUID. 137 138 Parameters 139 ---------- 140 s: str 141 The string to be checked. 142 143 Returns 144 ------- 145 A bool indicating whether the string is a valid UUID. 146 """ 147 import uuid 148 try: 149 uuid.UUID(str(s)) 150 return True 151 except Exception: 152 return False
Check if a string is a valid UUID.
Parameters
- s (str): The string to be checked.
Returns
- A bool indicating whether the string is a valid UUID.
155def string_to_dict(params_string: str) -> Dict[str, Any]: 156 """ 157 Parse a string into a dictionary. 158 159 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 160 161 Parameters 162 ---------- 163 params_string: str 164 The string to be parsed. 165 166 Returns 167 ------- 168 The parsed dictionary. 169 170 Examples 171 -------- 172 >>> string_to_dict("a:1,b:2") 173 {'a': 1, 'b': 2} 174 >>> string_to_dict('{"a": 1, "b": 2}') 175 {'a': 1, 'b': 2} 176 177 """ 178 if not params_string: 179 return {} 180 181 import json 182 183 ### Kind of a weird edge case. 184 ### In the generated compose file, there is some weird escaping happening, 185 ### so the string to be parsed starts and ends with a single quote. 186 if ( 187 isinstance(params_string, str) 188 and len(params_string) > 4 189 and params_string[1] == "{" 190 and params_string[-2] == "}" 191 ): 192 return json.loads(params_string[1:-1]) 193 194 if str(params_string).startswith('{'): 195 return json.loads(params_string) 196 197 import ast 198 params_dict = {} 199 200 items = [] 201 bracket_level = 0 202 brace_level = 0 203 current_item = '' 204 in_quotes = False 205 quote_char = '' 206 207 i = 0 208 while i < len(params_string): 209 char = params_string[i] 210 211 if in_quotes: 212 if char == quote_char and (i == 0 or params_string[i-1] != '\\'): 213 in_quotes = False 214 else: 215 if char in ('"', "'"): 216 in_quotes = True 217 quote_char = char 218 elif char == '[': 219 bracket_level += 1 220 elif char == ']': 221 bracket_level -= 1 222 elif char == '{': 223 brace_level += 1 224 elif char == '}': 225 brace_level -= 1 226 elif char == ',' and bracket_level == 0 and brace_level == 0: 227 items.append(current_item) 228 current_item = '' 229 i += 1 230 continue 231 232 current_item += char 233 i += 1 234 235 if current_item: 236 items.append(current_item) 237 238 for param in items: 239 param = param.strip() 240 if not param: 241 continue 242 243 _keys = param.split(":", maxsplit=1) 244 if len(_keys) != 2: 245 continue 246 247 keys = _keys[:-1] 248 try: 249 val = ast.literal_eval(_keys[-1]) 250 except Exception: 251 val = str(_keys[-1]) 252 253 c = params_dict 254 for _k in keys[:-1]: 255 try: 256 k = ast.literal_eval(_k) 257 except Exception: 258 k = str(_k) 259 if k not in c: 260 c[k] = {} 261 c = c[k] 262 263 c[keys[-1]] = val 264 265 return params_dict
Parse a string into a dictionary.
If the string begins with '{', parse as JSON. Otherwise use simple parsing.
Parameters
- params_string (str): The string to be parsed.
Returns
- The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2")
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
268def to_simple_dict(doc: Dict[str, Any]) -> str: 269 """ 270 Serialize a document dictionary in simple-dict format. 271 """ 272 import json 273 import ast 274 from meerschaum.utils.dtypes import json_serialize_value 275 276 def serialize_value(value): 277 if isinstance(value, str): 278 try: 279 evaluated = ast.literal_eval(value) 280 if not isinstance(evaluated, str): 281 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 282 return value 283 except (ValueError, SyntaxError, TypeError, MemoryError): 284 return value 285 286 return json.dumps(value, separators=(',', ':'), default=json_serialize_value) 287 288 return ','.join(f"{key}:{serialize_value(val)}" for key, val in doc.items())
Serialize a document dictionary in simple-dict format.
291def parse_config_substitution( 292 value: str, 293 leading_key: str = 'MRSM', 294 begin_key: str = '{', 295 end_key: str = '}', 296 delimeter: str = ':', 297) -> List[Any]: 298 """ 299 Parse Meerschaum substitution syntax 300 E.g. MRSM{value1:value2} => ['value1', 'value2'] 301 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 302 """ 303 if not value.beginswith(leading_key): 304 return value 305 306 return leading_key[len(leading_key):][len():-1].split(delimeter)
Parse Meerschaum substitution syntax
E.g. MRSM{value1:value2} => ['value1', 'value2']
NOTE: Not currently used. See search_and_substitute_config in meerschaum.config._read_yaml.
309def edit_file( 310 path: Union[pathlib.Path, str], 311 default_editor: str = 'pyvim', 312 debug: bool = False 313) -> bool: 314 """ 315 Open a file for editing. 316 317 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 318 319 Parameters 320 ---------- 321 path: Union[pathlib.Path, str] 322 The path to the file to be edited. 323 324 default_editor: str, default 'pyvim' 325 If `$EDITOR` is not set, use this instead. 326 If `pyvim` is not installed, it will install it from PyPI. 327 328 debug: bool, default False 329 Verbosity toggle. 330 331 Returns 332 ------- 333 A bool indicating the file was successfully edited. 334 """ 335 from subprocess import call 336 from meerschaum.utils.debug import dprint 337 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 338 try: 339 EDITOR = os.environ.get('EDITOR', default_editor) 340 if debug: 341 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 342 rc = call([EDITOR, path]) 343 except Exception as e: ### can't open with default editors 344 if debug: 345 dprint(str(e)) 346 dprint('Failed to open file with system editor. Falling back to pyvim...') 347 pyvim = attempt_import('pyvim', lazy=False) 348 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 349 return rc == 0
Open a file for editing.
Attempt to launch the user's defined $EDITOR, otherwise use pyvim.
Parameters
- path (Union[pathlib.Path, str]): The path to the file to be edited.
- default_editor (str, default 'pyvim'):
If
$EDITORis not set, use this instead. Ifpyvimis not installed, it will install it from PyPI. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating the file was successfully edited.
352def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 353 """ 354 Determine the columns and lines in the terminal. 355 If they cannot be determined, return the default values (100 columns and 120 lines). 356 357 Parameters 358 ---------- 359 default_cols: int, default 100 360 If the columns cannot be determined, return this value. 361 362 default_lines: int, default 120 363 If the lines cannot be determined, return this value. 364 365 Returns 366 ------- 367 A tuple if integers for the columns and lines. 368 """ 369 try: 370 size = os.get_terminal_size() 371 _cols, _lines = size.columns, size.lines 372 except Exception: 373 _cols, _lines = ( 374 int(os.environ.get('COLUMNS', str(default_cols))), 375 int(os.environ.get('LINES', str(default_lines))), 376 ) 377 return _cols, _lines
Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).
Parameters
- default_cols (int, default 100): If the columns cannot be determined, return this value.
- default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
- A tuple if integers for the columns and lines.
380def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 381 """ 382 Iterate over a list in chunks. 383 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 384 385 Parameters 386 ---------- 387 iterable: Iterable[Any] 388 The iterable to iterate over in chunks. 389 390 chunksize: int 391 The size of chunks to iterate with. 392 393 fillvalue: Optional[Any], default None 394 If the chunks do not evenly divide into the iterable, pad the end with this value. 395 396 Returns 397 ------- 398 A generator of tuples of size `chunksize`. 399 400 """ 401 from itertools import zip_longest 402 args = [iter(iterable)] * chunksize 403 return zip_longest(*args, fillvalue=fillvalue)
Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
Parameters
- iterable (Iterable[Any]): The iterable to iterate over in chunks.
- chunksize (int): The size of chunks to iterate with.
- fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
- A generator of tuples of size
chunksize.
405def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 406 """ 407 Sort a dictionary's values and return a new dictionary. 408 409 Parameters 410 ---------- 411 d: Dict[Any, Any] 412 The dictionary to be sorted. 413 414 Returns 415 ------- 416 A sorted dictionary. 417 418 Examples 419 -------- 420 >>> sorted_dict({'b': 1, 'a': 2}) 421 {'b': 1, 'a': 2} 422 >>> sorted_dict({'b': 2, 'a': 1}) 423 {'a': 1, 'b': 2} 424 425 """ 426 try: 427 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 428 except Exception: 429 return d
Sort a dictionary's values and return a new dictionary.
Parameters
- d (Dict[Any, Any]): The dictionary to be sorted.
Returns
- A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
431def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 432 """ 433 Convert the standard pipes dictionary into a list. 434 435 Parameters 436 ---------- 437 pipes_dict: PipesDict 438 The pipes dictionary to be flattened. 439 440 Returns 441 ------- 442 A list of `Pipe` objects. 443 444 """ 445 pipes_list = [] 446 for ck in pipes_dict.values(): 447 for mk in ck.values(): 448 pipes_list += list(mk.values()) 449 return pipes_list
Convert the standard pipes dictionary into a list.
Parameters
- pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
- A list of
Pipeobjects.
452def timed_input( 453 seconds: int = 10, 454 timeout_message: str = "", 455 prompt: str = "", 456 icon: bool = False, 457 **kw 458) -> Union[str, None]: 459 """ 460 Accept user input only for a brief period of time. 461 462 Parameters 463 ---------- 464 seconds: int, default 10 465 The number of seconds to wait. 466 467 timeout_message: str, default '' 468 The message to print after the window has elapsed. 469 470 prompt: str, default '' 471 The prompt to print during the window. 472 473 icon: bool, default False 474 If `True`, print the configured input icon. 475 476 477 Returns 478 ------- 479 The input string entered by the user. 480 481 """ 482 import signal, time 483 484 class TimeoutExpired(Exception): 485 """Raise this exception when the timeout is reached.""" 486 487 def alarm_handler(signum, frame): 488 raise TimeoutExpired 489 490 # set signal handler 491 signal.signal(signal.SIGALRM, alarm_handler) 492 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 493 494 try: 495 return input(prompt) 496 except TimeoutExpired: 497 return None 498 except (EOFError, RuntimeError): 499 try: 500 print(prompt) 501 time.sleep(seconds) 502 except TimeoutExpired: 503 return None 504 finally: 505 signal.alarm(0) # cancel alarm
Accept user input only for a brief period of time.
Parameters
- seconds (int, default 10): The number of seconds to wait.
- timeout_message (str, default ''): The message to print after the window has elapsed.
- prompt (str, default ''): The prompt to print during the window.
- icon (bool, default False):
If
True, print the configured input icon.
Returns
- The input string entered by the user.
508def enforce_gevent_monkey_patch(): 509 """ 510 Check if gevent monkey patching is enabled, and if not, then apply patching. 511 """ 512 from meerschaum.utils.packages import attempt_import 513 import socket 514 gevent, gevent_socket, gevent_monkey = attempt_import( 515 'gevent', 'gevent.socket', 'gevent.monkey' 516 ) 517 if not socket.socket is gevent_socket.socket: 518 gevent_monkey.patch_all()
Check if gevent monkey patching is enabled, and if not, then apply patching.
520def is_valid_email(email: str) -> Union['re.Match', None]: 521 """ 522 Check whether a string is a valid email. 523 524 Parameters 525 ---------- 526 email: str 527 The string to be examined. 528 529 Returns 530 ------- 531 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 532 533 Examples 534 -------- 535 >>> is_valid_email('foo') 536 >>> is_valid_email('foo@foo.com') 537 <re.Match object; span=(0, 11), match='foo@foo.com'> 538 539 """ 540 import re 541 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 542 return re.search(regex, email)
Check whether a string is a valid email.
Parameters
- email (str): The string to be examined.
Returns
- None if a string is not in email format, otherwise a
re.Matchobject, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
545def string_width(string: str, widest: bool = True) -> int: 546 """ 547 Calculate the width of a string, either by its widest or last line. 548 549 Parameters 550 ---------- 551 string: str: 552 The string to be examined. 553 554 widest: bool, default True 555 No longer used because `widest` is always assumed to be true. 556 557 Returns 558 ------- 559 An integer for the text's visual width. 560 561 Examples 562 -------- 563 >>> string_width('a') 564 1 565 >>> string_width('a\\nbc\\nd') 566 2 567 568 """ 569 def _widest(): 570 words = string.split('\n') 571 max_length = 0 572 for w in words: 573 length = len(w) 574 if length > max_length: 575 max_length = length 576 return max_length 577 578 return _widest()
Calculate the width of a string, either by its widest or last line.
Parameters
- string (str:): The string to be examined.
- widest (bool, default True):
No longer used because
widestis always assumed to be true.
Returns
- An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
623def get_val_from_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...]) -> Any: 624 """ 625 Get a value from a dictionary with a tuple of keys. 626 627 Parameters 628 ---------- 629 d: Dict[Any, Any] 630 The dictionary to search. 631 632 path: Tuple[Any, ...] 633 The path of keys to traverse. 634 635 Returns 636 ------- 637 The value from the end of the path. 638 """ 639 return functools.reduce(lambda di, key: di[key], path, d)
Get a value from a dictionary with a tuple of keys.
Parameters
- d (Dict[Any, Any]): The dictionary to search.
- path (Tuple[Any, ...]): The path of keys to traverse.
Returns
- The value from the end of the path.
642def set_val_in_dict_path(d: Dict[Any, Any], path: Tuple[Any, ...], val: Any) -> None: 643 """ 644 Set a value in a dictionary with a tuple of keys. 645 646 Parameters 647 ---------- 648 d: Dict[Any, Any] 649 The dictionary to search. 650 651 path: Tuple[Any, ...] 652 The path of keys to traverse. 653 654 val: Any 655 The value to set at the end of the path. 656 """ 657 get_val_from_dict_path(d, path[:-1])[path[-1]] = val
Set a value in a dictionary with a tuple of keys.
Parameters
- d (Dict[Any, Any]): The dictionary to search.
- path (Tuple[Any, ...]): The path of keys to traverse.
- val (Any): The value to set at the end of the path.
660def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 661 """ 662 Recursively replace passwords in a dictionary. 663 664 Parameters 665 ---------- 666 d: Dict[str, Any] 667 The dictionary to search through. 668 669 replace_with: str, default '*' 670 The string to replace each character of the password with. 671 672 Returns 673 ------- 674 Another dictionary where values to the keys `'password'` 675 are replaced with `replace_with` (`'*'`). 676 677 Examples 678 -------- 679 >>> replace_password({'a': 1}) 680 {'a': 1} 681 >>> replace_password({'password': '123'}) 682 {'password': '***'} 683 >>> replace_password({'nested': {'password': '123'}}) 684 {'nested': {'password': '***'}} 685 >>> replace_password({'password': '123'}, replace_with='!') 686 {'password': '!!!'} 687 688 """ 689 import copy 690 _d = copy.deepcopy(d) 691 for k, v in d.items(): 692 if isinstance(v, dict): 693 _d[k] = replace_password(v) 694 elif 'password' in str(k).lower(): 695 _d[k] = ''.join([replace_with for char in str(v)]) 696 elif str(k).lower() == 'uri': 697 from meerschaum.connectors.sql import SQLConnector 698 try: 699 uri_params = SQLConnector.parse_uri(v) 700 except Exception: 701 uri_params = None 702 if not uri_params: 703 continue 704 if not 'username' in uri_params or not 'password' in uri_params: 705 continue 706 _d[k] = v.replace( 707 uri_params['username'] + ':' + uri_params['password'], 708 uri_params['username'] + ':' + ''.join( 709 [replace_with for char in str(uri_params['password'])] 710 ) 711 ) 712 return _d
Recursively replace passwords in a dictionary.
Parameters
- d (Dict[str, Any]): The dictionary to search through.
- replace_with (str, default '*'): The string to replace each character of the password with.
Returns
- Another dictionary where values to the keys
'password' - are replaced with
replace_with('*').
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
715def filter_arguments( 716 func: Callable[[Any], Any], 717 *args: Any, 718 **kwargs: Any 719) -> Tuple[Tuple[Any], Dict[str, Any]]: 720 """ 721 Filter out unsupported positional and keyword arguments. 722 723 Parameters 724 ---------- 725 func: Callable[[Any], Any] 726 The function to inspect. 727 728 *args: Any 729 Positional arguments to filter and pass to `func`. 730 731 **kwargs 732 Keyword arguments to filter and pass to `func`. 733 734 Returns 735 ------- 736 The `args` and `kwargs` accepted by `func`. 737 """ 738 args = filter_positionals(func, *args) 739 kwargs = filter_keywords(func, **kwargs) 740 return args, kwargs
Filter out unsupported positional and keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
Positional arguments to filter and pass to
func. - **kwargs: Keyword arguments to filter and pass to
func.
Returns
- The
argsandkwargsaccepted byfunc.
743def filter_keywords( 744 func: Callable[[Any], Any], 745 **kw: Any 746) -> Dict[str, Any]: 747 """ 748 Filter out unsupported keyword arguments. 749 750 Parameters 751 ---------- 752 func: Callable[[Any], Any] 753 The function to inspect. 754 755 **kw: Any 756 The arguments to be filtered and passed into `func`. 757 758 Returns 759 ------- 760 A dictionary of keyword arguments accepted by `func`. 761 762 Examples 763 -------- 764 ```python 765 >>> def foo(a=1, b=2): 766 ... return a * b 767 >>> filter_keywords(foo, a=2, b=4, c=6) 768 {'a': 2, 'b': 4} 769 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 770 8 771 ``` 772 773 """ 774 import inspect 775 func_params = inspect.signature(func).parameters 776 ### If the function has a **kw method, skip filtering. 777 for param, _type in func_params.items(): 778 if '**' in str(_type): 779 return kw 780 return {k: v for k, v in kw.items() if k in func_params}
Filter out unsupported keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- **kw (Any):
The arguments to be filtered and passed into
func.
Returns
- A dictionary of keyword arguments accepted by
func.
Examples
>>> def foo(a=1, b=2):
... return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
783def filter_positionals( 784 func: Callable[[Any], Any], 785 *args: Any 786) -> Tuple[Any]: 787 """ 788 Filter out unsupported positional arguments. 789 790 Parameters 791 ---------- 792 func: Callable[[Any], Any] 793 The function to inspect. 794 795 *args: Any 796 The arguments to be filtered and passed into `func`. 797 NOTE: If the function signature expects more arguments than provided, 798 the missing slots will be filled with `None`. 799 800 Returns 801 ------- 802 A tuple of positional arguments accepted by `func`. 803 804 Examples 805 -------- 806 ```python 807 >>> def foo(a, b): 808 ... return a * b 809 >>> filter_positionals(foo, 2, 4, 6) 810 (2, 4) 811 >>> foo(*filter_positionals(foo, 2, 4, 6)) 812 8 813 ``` 814 815 """ 816 import inspect 817 from meerschaum.utils.warnings import warn 818 func_params = inspect.signature(func).parameters 819 acceptable_args: List[Any] = [] 820 821 def _warn_invalids(_num_invalid): 822 if _num_invalid > 0: 823 warn( 824 "Too few arguments were provided. " 825 + f"{_num_invalid} argument" 826 + ('s have ' if _num_invalid != 1 else " has ") 827 + " been filled with `None`.", 828 ) 829 830 num_invalid: int = 0 831 for i, (param, val) in enumerate(func_params.items()): 832 if '=' in str(val) or '*' in str(val): 833 _warn_invalids(num_invalid) 834 return tuple(acceptable_args) 835 836 try: 837 acceptable_args.append(args[i]) 838 except IndexError: 839 acceptable_args.append(None) 840 num_invalid += 1 841 842 _warn_invalids(num_invalid) 843 return tuple(acceptable_args)
Filter out unsupported positional arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
The arguments to be filtered and passed into
func. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled withNone.
Returns
- A tuple of positional arguments accepted by
func.
Examples
>>> def foo(a, b):
... return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
846def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 847 """ 848 Convert an ordered dict to a dict. 849 Does not mutate the original OrderedDict. 850 """ 851 from collections import OrderedDict 852 _d = dict(od) 853 for k, v in od.items(): 854 if isinstance(v, OrderedDict) or ( 855 issubclass(type(v), OrderedDict) 856 ): 857 _d[k] = dict_from_od(v) 858 return _d
Convert an ordered dict to a dict. Does not mutate the original OrderedDict.
861def remove_ansi(s: str) -> str: 862 """ 863 Remove ANSI escape characters from a string. 864 865 Parameters 866 ---------- 867 s: str: 868 The string to be cleaned. 869 870 Returns 871 ------- 872 A string with the ANSI characters removed. 873 874 Examples 875 -------- 876 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 877 'Hello, World!' 878 879 """ 880 import re 881 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
Remove ANSI escape characters from a string.
Parameters
- s (str:): The string to be cleaned.
Returns
- A string with the ANSI characters removed.
Examples
>>> remove_ansi("[1;31mHello, World![0m")
'Hello, World!'
884def get_connector_labels( 885 *types: str, 886 search_term: str = '', 887 ignore_exact_match = True, 888 _additional_options: Optional[List[str]] = None, 889) -> List[str]: 890 """ 891 Read connector labels from the configuration dictionary. 892 893 Parameters 894 ---------- 895 *types: str 896 The connector types. 897 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 898 899 search_term: str, default '' 900 A filter on the connectors' labels. 901 902 ignore_exact_match: bool, default True 903 If `True`, skip a connector if the search_term is an exact match. 904 905 Returns 906 ------- 907 A list of the keys of defined connectors. 908 909 """ 910 from meerschaum.config import get_config 911 connectors = get_config('meerschaum', 'connectors') 912 913 _types = list(types) 914 if len(_types) == 0: 915 _types = list(connectors.keys()) + ['plugin'] 916 917 conns = [] 918 for t in _types: 919 if t == 'plugin': 920 from meerschaum.plugins import get_data_plugins 921 conns += [ 922 f'{t}:' + plugin.module.__name__.split('.')[-1] 923 for plugin in get_data_plugins() 924 ] 925 continue 926 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 927 928 if _additional_options: 929 conns += _additional_options 930 931 possibilities = [ 932 c 933 for c in conns 934 if c.startswith(search_term) 935 and c != ( 936 search_term if ignore_exact_match else '' 937 ) 938 ] 939 return sorted(possibilities)
Read connector labels from the configuration dictionary.
Parameters
- *types (str):
The connector types.
If none are provided, use the defined types (
'sql'and'api') and'plugin'. - search_term (str, default ''): A filter on the connectors' labels.
- ignore_exact_match (bool, default True):
If
True, skip a connector if the search_term is an exact match.
Returns
- A list of the keys of defined connectors.
942def wget( 943 url: str, 944 dest: Optional[Union[str, 'pathlib.Path']] = None, 945 headers: Optional[Dict[str, Any]] = None, 946 color: bool = True, 947 debug: bool = False, 948 **kw: Any 949) -> 'pathlib.Path': 950 """ 951 Mimic `wget` with `requests`. 952 953 Parameters 954 ---------- 955 url: str 956 The URL to the resource to be downloaded. 957 958 dest: Optional[Union[str, pathlib.Path]], default None 959 The destination path of the downloaded file. 960 If `None`, save to the current directory. 961 962 color: bool, default True 963 If `debug` is `True`, print color output. 964 965 debug: bool, default False 966 Verbosity toggle. 967 968 Returns 969 ------- 970 The path to the downloaded file. 971 972 """ 973 from meerschaum.utils.warnings import warn, error 974 from meerschaum.utils.debug import dprint 975 import re, urllib.request 976 if headers is None: 977 headers = {} 978 request = urllib.request.Request(url, headers=headers) 979 if not color: 980 dprint = print 981 if debug: 982 dprint(f"Downloading from '{url}'...") 983 try: 984 response = urllib.request.urlopen(request) 985 except Exception as e: 986 import ssl 987 ssl._create_default_https_context = ssl._create_unverified_context 988 try: 989 response = urllib.request.urlopen(request) 990 except Exception as _e: 991 print(_e) 992 response = None 993 if response is None or response.code != 200: 994 error_msg = f"Failed to download from '{url}'." 995 if color: 996 error(error_msg) 997 else: 998 print(error_msg) 999 import sys 1000 sys.exit(1) 1001 1002 d = response.headers.get('content-disposition', None) 1003 fname = ( 1004 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1005 else url.split('/')[-1] 1006 ) 1007 1008 if dest is None: 1009 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1010 elif isinstance(dest, str): 1011 dest = pathlib.Path(dest) 1012 1013 with open(dest, 'wb') as f: 1014 f.write(response.fp.read()) 1015 1016 if debug: 1017 dprint(f"Downloaded file '{dest}'.") 1018 1019 return dest
Mimic wget with requests.
Parameters
- url (str): The URL to the resource to be downloaded.
- dest (Optional[Union[str, pathlib.Path]], default None):
The destination path of the downloaded file.
If
None, save to the current directory. - color (bool, default True):
If
debugisTrue, print color output. - debug (bool, default False): Verbosity toggle.
Returns
- The path to the downloaded file.
1022def async_wrap(func): 1023 """ 1024 Run a synchronous function as async. 1025 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1026 """ 1027 import asyncio 1028 from functools import wraps, partial 1029 1030 @wraps(func) 1031 async def run(*args, loop=None, executor=None, **kwargs): 1032 if loop is None: 1033 loop = asyncio.get_event_loop() 1034 pfunc = partial(func, *args, **kwargs) 1035 return await loop.run_in_executor(executor, pfunc) 1036 return run
Run a synchronous function as async. https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1039def debug_trace(browser: bool = True): 1040 """ 1041 Open a web-based debugger to trace the execution of the program. 1042 1043 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1044 """ 1045 from meerschaum.utils.debug import trace 1046 trace(browser=browser)
Open a web-based debugger to trace the execution of the program.
This is an alias import for meerschaum.utils.debug.debug_trace.
1049def items_str( 1050 items: List[Any], 1051 quotes: bool = True, 1052 quote_str: str = "'", 1053 commas: bool = True, 1054 comma_str: str = ',', 1055 and_: bool = True, 1056 and_str: str = 'and', 1057 oxford_comma: bool = True, 1058 spaces: bool = True, 1059 space_str = ' ', 1060) -> str: 1061 """ 1062 Return a formatted string if list items separated by commas. 1063 1064 Parameters 1065 ---------- 1066 items: [List[Any]] 1067 The items to be printed as an English list. 1068 1069 quotes: bool, default True 1070 If `True`, wrap items in quotes. 1071 1072 quote_str: str, default "'" 1073 If `quotes` is `True`, prepend and append each item with this string. 1074 1075 and_: bool, default True 1076 If `True`, include the word 'and' before the final item in the list. 1077 1078 and_str: str, default 'and' 1079 If `and_` is True, insert this string where 'and' normally would in and English list. 1080 1081 oxford_comma: bool, default True 1082 If `True`, include the Oxford Comma (comma before the final 'and'). 1083 Only applies when `and_` is `True`. 1084 1085 spaces: bool, default True 1086 If `True`, separate items with `space_str` 1087 1088 space_str: str, default ' ' 1089 If `spaces` is `True`, separate items with this string. 1090 1091 Returns 1092 ------- 1093 A string of the items as an English list. 1094 1095 Examples 1096 -------- 1097 >>> items_str([1,2,3]) 1098 "'1', '2', and '3'" 1099 >>> items_str([1,2,3], quotes=False) 1100 '1, 2, and 3' 1101 >>> items_str([1,2,3], and_=False) 1102 "'1', '2', '3'" 1103 >>> items_str([1,2,3], spaces=False, and_=False) 1104 "'1','2','3'" 1105 >>> items_str([1,2,3], oxford_comma=False) 1106 "'1', '2' and '3'" 1107 >>> items_str([1,2,3], quote_str=":") 1108 ':1:, :2:, and :3:' 1109 >>> items_str([1,2,3], and_str="or") 1110 "'1', '2', or '3'" 1111 >>> items_str([1,2,3], space_str="_") 1112 "'1',_'2',_and_'3'" 1113 1114 """ 1115 if not items: 1116 return '' 1117 1118 q = quote_str if quotes else '' 1119 s = space_str if spaces else '' 1120 a = and_str if and_ else '' 1121 c = comma_str if commas else '' 1122 1123 if len(items) == 1: 1124 return q + str(list(items)[0]) + q 1125 1126 if len(items) == 2: 1127 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1128 1129 sep = q + c + s + q 1130 output = q + sep.join(str(i) for i in items[:-1]) + q 1131 if oxford_comma: 1132 output += c 1133 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1134 return output
Return a formatted string if list items separated by commas.
Parameters
- items ([List[Any]]): The items to be printed as an English list.
- quotes (bool, default True):
If
True, wrap items in quotes. - quote_str (str, default "'"):
If
quotesisTrue, prepend and append each item with this string. - and_ (bool, default True):
If
True, include the word 'and' before the final item in the list. - and_str (str, default 'and'):
If
and_is True, insert this string where 'and' normally would in and English list. - oxford_comma (bool, default True):
If
True, include the Oxford Comma (comma before the final 'and'). Only applies whenand_isTrue. - spaces (bool, default True):
If
True, separate items withspace_str - space_str (str, default ' '):
If
spacesisTrue, separate items with this string.
Returns
- A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
1137def interval_str(delta: Union[timedelta, int], round_unit: bool = False) -> str: 1138 """ 1139 Return a human-readable string for a `timedelta` (or `int` minutes). 1140 1141 Parameters 1142 ---------- 1143 delta: Union[timedelta, int] 1144 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1145 1146 round_unit: bool, default False 1147 If `True`, round the output to a single unit. 1148 1149 Returns 1150 ------- 1151 A formatted string, fit for human eyes. 1152 """ 1153 from meerschaum.utils.packages import attempt_import 1154 if is_int(str(delta)) and not round_unit: 1155 return str(delta) 1156 1157 humanfriendly = attempt_import('humanfriendly', lazy=False) 1158 delta_seconds = ( 1159 delta.total_seconds() 1160 if hasattr(delta, 'total_seconds') 1161 else (delta * 60) 1162 ) 1163 1164 is_negative = delta_seconds < 0 1165 delta_seconds = abs(delta_seconds) 1166 replace_units = {} 1167 1168 if round_unit: 1169 if delta_seconds < 1: 1170 delta_seconds = round(delta_seconds, 2) 1171 elif delta_seconds < 60: 1172 delta_seconds = int(delta_seconds) 1173 elif delta_seconds < 3600: 1174 delta_seconds = int(delta_seconds / 60) * 60 1175 elif delta_seconds < 86400: 1176 delta_seconds = int(delta_seconds / 3600) * 3600 1177 elif delta_seconds < (86400 * 7): 1178 delta_seconds = int(delta_seconds / 86400) * 86400 1179 elif delta_seconds < (86400 * 7 * 4): 1180 delta_seconds = int(delta_seconds / (86400 * 7)) * (86400 * 7) 1181 elif delta_seconds < (86400 * 7 * 4 * 13): 1182 delta_seconds = int(delta_seconds / (86400 * 7 * 4)) * (86400 * 7) 1183 replace_units['weeks'] = 'months' 1184 else: 1185 delta_seconds = int(delta_seconds / (86400 * 364)) * (86400 * 364) 1186 1187 delta_str = humanfriendly.format_timespan(delta_seconds) 1188 if ',' in delta_str and round_unit: 1189 delta_str = delta_str.split(',')[0] 1190 elif ' and ' in delta_str and round_unit: 1191 delta_str = delta_str.split(' and ')[0] 1192 1193 for parsed_unit, replacement_unit in replace_units.items(): 1194 delta_str = delta_str.replace(parsed_unit, replacement_unit) 1195 1196 return delta_str + (' ago' if is_negative else '')
Return a human-readable string for a timedelta (or int minutes).
Parameters
- delta (Union[timedelta, int]):
The interval to print. If
deltais an integer, assume it corresponds to minutes. - round_unit (bool, default False):
If
True, round the output to a single unit.
Returns
- A formatted string, fit for human eyes.
1199def is_docker_available() -> bool: 1200 """Check if we can connect to the Docker engine.""" 1201 import subprocess 1202 try: 1203 has_docker = subprocess.call( 1204 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1205 ) == 0 1206 except Exception: 1207 has_docker = False 1208 return has_docker
Check if we can connect to the Docker engine.
1211def is_android() -> bool: 1212 """Return `True` if the current platform is Android.""" 1213 import sys 1214 return hasattr(sys, 'getandroidapilevel')
Return True if the current platform is Android.
1217def is_bcp_available() -> bool: 1218 """Check if the MSSQL `bcp` utility is installed.""" 1219 import subprocess 1220 1221 try: 1222 has_bcp = subprocess.call( 1223 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1224 ) == 0 1225 except Exception: 1226 has_bcp = False 1227 return has_bcp
Check if the MSSQL bcp utility is installed.
1230def is_systemd_available() -> bool: 1231 """Check if running on systemd.""" 1232 import subprocess 1233 try: 1234 has_systemctl = subprocess.call( 1235 ['systemctl', 'whoami'], 1236 stdout=subprocess.DEVNULL, 1237 stderr=subprocess.STDOUT, 1238 ) == 0 1239 except FileNotFoundError: 1240 has_systemctl = False 1241 except Exception: 1242 import traceback 1243 traceback.print_exc() 1244 has_systemctl = False 1245 return has_systemctl
Check if running on systemd.
1248def is_tmux_available() -> bool: 1249 """ 1250 Check if `tmux` is installed. 1251 """ 1252 import subprocess 1253 try: 1254 has_tmux = subprocess.call( 1255 ['tmux', '-V'], 1256 stdout=subprocess.DEVNULL, 1257 stderr=subprocess.STDOUT 1258 ) == 0 1259 except FileNotFoundError: 1260 has_tmux = False 1261 except Exception: 1262 has_tmux = False 1263 return has_tmux
Check if tmux is installed.
1265def get_last_n_lines(file_name: str, N: int): 1266 """ 1267 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1268 """ 1269 # Create an empty list to keep the track of last N lines 1270 list_of_lines = [] 1271 # Open file for reading in binary mode 1272 with open(file_name, 'rb') as read_obj: 1273 # Move the cursor to the end of the file 1274 read_obj.seek(0, os.SEEK_END) 1275 # Create a buffer to keep the last read line 1276 buffer = bytearray() 1277 # Get the current position of pointer i.e eof 1278 pointer_location = read_obj.tell() 1279 # Loop till pointer reaches the top of the file 1280 while pointer_location >= 0: 1281 # Move the file pointer to the location pointed by pointer_location 1282 read_obj.seek(pointer_location) 1283 # Shift pointer location by -1 1284 pointer_location = pointer_location -1 1285 # read that byte / character 1286 new_byte = read_obj.read(1) 1287 # If the read byte is new line character then it means one line is read 1288 if new_byte == b'\n': 1289 # Save the line in list of lines 1290 list_of_lines.append(buffer.decode()[::-1]) 1291 # If the size of list reaches N, then return the reversed list 1292 if len(list_of_lines) == N: 1293 return list(reversed(list_of_lines)) 1294 # Reinitialize the byte array to save next line 1295 buffer = bytearray() 1296 else: 1297 # If last read character is not eol then add it in buffer 1298 buffer.extend(new_byte) 1299 # As file is read completely, if there is still data in buffer, then its first line. 1300 if len(buffer) > 0: 1301 list_of_lines.append(buffer.decode()[::-1]) 1302 # return the reversed list 1303 return list(reversed(list_of_lines))
1306def tail(f, n, offset=None): 1307 """ 1308 https://stackoverflow.com/a/692616/9699829 1309 1310 Reads n lines from f with an offset of offset lines. The return 1311 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1312 an indicator that is `True` if there are more lines in the file. 1313 """ 1314 avg_line_length = 74 1315 to_read = n + (offset or 0) 1316 1317 while True: 1318 try: 1319 f.seek(-(avg_line_length * to_read), 2) 1320 except IOError: 1321 # woops. apparently file is smaller than what we want 1322 # to step back, go to the beginning instead 1323 f.seek(0) 1324 pos = f.tell() 1325 lines = f.read().splitlines() 1326 if len(lines) >= to_read or pos == 0: 1327 return lines[-to_read:offset and -offset or None], \ 1328 len(lines) > to_read or pos > 0 1329 avg_line_length *= 1.3
https://stackoverflow.com/a/692616/9699829
Reads n lines from f with an offset of offset lines. The return
value is a tuple in the form (lines, has_more) where has_more is
an indicator that is True if there are more lines in the file.
1332def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1333 """ 1334 Remove characters from each section of a string until the length is within the limit. 1335 1336 Parameters 1337 ---------- 1338 item: str 1339 The item name to be truncated. 1340 1341 delimeter: str, default '_' 1342 Split `item` by this string into several sections. 1343 1344 max_len: int, default 128 1345 The max acceptable length of the truncated version of `item`. 1346 1347 Returns 1348 ------- 1349 The truncated string. 1350 1351 Examples 1352 -------- 1353 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1354 'ab_de_gh' 1355 1356 """ 1357 if len(item) < max_len: 1358 return item 1359 1360 def _shorten(s: str) -> str: 1361 return s[:-1] if len(s) > 1 else s 1362 1363 sections = list(enumerate(item.split('_'))) 1364 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1365 available_chars = max_len - len(sections) 1366 1367 _sections = [(i, s) for i, s in sorted_sections] 1368 _sections_len = sum([len(s) for i, s in _sections]) 1369 _old_sections_len = _sections_len 1370 while _sections_len > available_chars: 1371 _sections = [(i, _shorten(s)) for i, s in _sections] 1372 _old_sections_len = _sections_len 1373 _sections_len = sum([len(s) for i, s in _sections]) 1374 if _old_sections_len == _sections_len: 1375 raise Exception(f"String could not be truncated: '{item}'") 1376 1377 new_sections = sorted(_sections, key=lambda x: x[0]) 1378 return delimeter.join([s for i, s in new_sections])
Remove characters from each section of a string until the length is within the limit.
Parameters
- item (str): The item name to be truncated.
- delimeter (str, default '_'):
Split
itemby this string into several sections. - max_len (int, default 128):
The max acceptable length of the truncated version of
item.
Returns
- The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
1381def truncate_text_for_display( 1382 text: str, 1383 max_length: int = 50, 1384 suffix: str = '…', 1385) -> str: 1386 """ 1387 Truncate a potentially long string for display purposes. 1388 1389 Parameters 1390 ---------- 1391 text: str 1392 The string to be truncated. 1393 1394 max_length: int, default 60 1395 The maximum length of `text` before truncation. 1396 1397 suffix: str, default '…' 1398 The string to append to the length of `text` to indicate truncation. 1399 1400 Returns 1401 ------- 1402 A string of length `max_length` or less. 1403 """ 1404 text_length = len(text) 1405 if text_length <= max_length: 1406 return text 1407 1408 suffix_length = len(suffix) 1409 1410 truncated_text = text[:max_length - suffix_length] 1411 return truncated_text + suffix
Truncate a potentially long string for display purposes.
Parameters
- text (str): The string to be truncated.
- max_length (int, default 60):
The maximum length of
textbefore truncation. - suffix (str, default '…'):
The string to append to the length of
textto indicate truncation.
Returns
- A string of length
max_lengthor less.
1414def separate_negation_values( 1415 vals: Union[List[str], Tuple[str]], 1416 negation_prefix: Optional[str] = None, 1417) -> Tuple[List[str], List[str]]: 1418 """ 1419 Separate the negated values from the positive ones. 1420 Return two lists: positive and negative values. 1421 1422 Parameters 1423 ---------- 1424 vals: Union[List[str], Tuple[str]] 1425 A list of strings to parse. 1426 1427 negation_prefix: Optional[str], default None 1428 Include values that start with this string in the second list. 1429 If `None`, use the system default (`_`). 1430 """ 1431 if negation_prefix is None: 1432 from meerschaum._internal.static import STATIC_CONFIG 1433 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1434 _in_vals, _ex_vals = [], [] 1435 for v in vals: 1436 if str(v).startswith(negation_prefix): 1437 _ex_vals.append(str(v)[len(negation_prefix):]) 1438 else: 1439 _in_vals.append(v) 1440 1441 return _in_vals, _ex_vals
Separate the negated values from the positive ones. Return two lists: positive and negative values.
Parameters
- vals (Union[List[str], Tuple[str]]): A list of strings to parse.
- negation_prefix (Optional[str], default None):
Include values that start with this string in the second list.
If
None, use the system default (_).
1444def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1445 """ 1446 Translate a params dictionary into lists of include- and exclude-values. 1447 1448 Parameters 1449 ---------- 1450 params: Optional[Dict[str, Any]] 1451 A params query dictionary. 1452 1453 Returns 1454 ------- 1455 A dictionary mapping keys to a tuple of lists for include and exclude values. 1456 1457 Examples 1458 -------- 1459 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1460 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1461 """ 1462 if not params: 1463 return {} 1464 return { 1465 col: separate_negation_values( 1466 ( 1467 val 1468 if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype') 1469 else [val] 1470 ) 1471 ) 1472 for col, val in params.items() 1473 }
Translate a params dictionary into lists of include- and exclude-values.
Parameters
- params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
- A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
1476def flatten_list(list_: List[Any]) -> List[Any]: 1477 """ 1478 Recursively flatten a list. 1479 """ 1480 for item in list_: 1481 if isinstance(item, list): 1482 yield from flatten_list(item) 1483 else: 1484 yield item
Recursively flatten a list.
1487def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1488 """ 1489 Parse a string containing the text to be passed into a function 1490 and return a tuple of args, kwargs. 1491 1492 Parameters 1493 ---------- 1494 args_str: str 1495 The contents of the function parameter (as a string). 1496 1497 Returns 1498 ------- 1499 A tuple of args (tuple) and kwargs (dict[str, Any]). 1500 1501 Examples 1502 -------- 1503 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1504 (123, 456), {'foo': 789, 'bar': 'baz'} 1505 """ 1506 import ast 1507 args = [] 1508 kwargs = {} 1509 1510 for part in args_str.split(','): 1511 if '=' in part: 1512 key, val = part.split('=', 1) 1513 kwargs[key.strip()] = ast.literal_eval(val) 1514 else: 1515 args.append(ast.literal_eval(part.strip())) 1516 1517 return tuple(args), kwargs
Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.
Parameters
- args_str (str): The contents of the function parameter (as a string).
Returns
- A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
1520def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1521 """ 1522 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1523 1524 Parameters 1525 ---------- 1526 src_path: pathlib.Path 1527 The source path. 1528 1529 dest_path: pathlib.Path 1530 The destination path. 1531 1532 Returns 1533 ------- 1534 A SuccessTuple indicating success. 1535 """ 1536 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1537 return True, "Symlink already exists." 1538 try: 1539 dest_path.symlink_to(src_path) 1540 success = True 1541 except Exception as e: 1542 success = False 1543 msg = str(e) 1544 if success: 1545 return success, "Success" 1546 1547 ### Failed to create a symlink. 1548 ### If we're not on Windows, return an error. 1549 import platform 1550 if platform.system() != 'Windows': 1551 return success, msg 1552 1553 try: 1554 import _winapi 1555 except ImportError: 1556 return False, "Unable to import _winapi." 1557 1558 if src_path.is_dir(): 1559 try: 1560 _winapi.CreateJunction(str(src_path), str(dest_path)) 1561 except Exception as e: 1562 return False, str(e) 1563 return True, "Success" 1564 1565 ### Last resort: copy the file on Windows. 1566 import shutil 1567 try: 1568 shutil.copy(src_path, dest_path) 1569 except Exception as e: 1570 return False, str(e) 1571 1572 return True, "Success"
Wrap around pathlib.Path.symlink_to, but add support for Windows.
Parameters
- src_path (pathlib.Path): The source path.
- dest_path (pathlib.Path): The destination path.
Returns
- A SuccessTuple indicating success.
1575def is_symlink(path: pathlib.Path) -> bool: 1576 """ 1577 Wrap `path.is_symlink()` but add support for Windows junctions. 1578 """ 1579 if path.is_symlink(): 1580 return True 1581 1582 import platform 1583 if platform.system() != 'Windows': 1584 return False 1585 try: 1586 return bool(os.readlink(path)) 1587 except OSError: 1588 return False
Wrap path.is_symlink() but add support for Windows junctions.
1591def parametrized(dec): 1592 """ 1593 A meta-decorator for allowing other decorator functions to have parameters. 1594 1595 https://stackoverflow.com/a/26151604/9699829 1596 """ 1597 def layer(*args, **kwargs): 1598 def repl(f): 1599 return dec(f, *args, **kwargs) 1600 return repl 1601 return layer
A meta-decorator for allowing other decorator functions to have parameters.
1604def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1605 """ 1606 Safely extract a TAR file to a give directory. 1607 This defends against CVE-2007-4559. 1608 1609 Parameters 1610 ---------- 1611 tarf: file 1612 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1613 1614 output_dir: Union[str, pathlib.Path] 1615 The output directory. 1616 """ 1617 1618 def is_within_directory(directory, target): 1619 abs_directory = os.path.abspath(directory) 1620 abs_target = os.path.abspath(target) 1621 prefix = os.path.commonprefix([abs_directory, abs_target]) 1622 return prefix == abs_directory 1623 1624 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1625 for member in tar.getmembers(): 1626 member_path = os.path.join(path, member.name) 1627 if not is_within_directory(path, member_path): 1628 raise Exception("Attempted Path Traversal in Tar File") 1629 1630 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1631 1632 return safe_extract(tarf, output_dir)
Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.
Parameters
- tarf (file):
The TAR file opened with
tarfile.open(path, 'r:gz'). - output_dir (Union[str, pathlib.Path]): The output directory.
1635def to_snake_case(name: str) -> str: 1636 """ 1637 Return the given string in snake-case-style. 1638 1639 Parameters 1640 ---------- 1641 name: str 1642 The input text to convert to snake case. 1643 1644 Returns 1645 ------- 1646 A snake-case version of `name`. 1647 1648 Examples 1649 -------- 1650 >>> to_snake_case("HelloWorld!") 1651 'hello_world' 1652 >>> to_snake_case("This has spaces in it.") 1653 'this_has_spaces_in_it' 1654 >>> to_snake_case("already_in_snake_case") 1655 'already_in_snake_case' 1656 """ 1657 import re 1658 name = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name) 1659 name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name) 1660 name = re.sub(r'[^\w\s]', '', name) 1661 name = re.sub(r'\s+', '_', name) 1662 return name.lower()
Return the given string in snake-case-style.
Parameters
- name (str): The input text to convert to snake case.
Returns
- A snake-case version of
name.
Examples
>>> to_snake_case("HelloWorld!")
'hello_world'
>>> to_snake_case("This has spaces in it.")
'this_has_spaces_in_it'
>>> to_snake_case("already_in_snake_case")
'already_in_snake_case'
1665def get_directory_size(path: Path) -> int: 1666 """ 1667 Return the cumulative size of a directory's files in bytes. 1668 https://stackoverflow.com/a/55659577/9699829 1669 """ 1670 return sum(file.stat().st_size for file in path.rglob('*'))
Return the cumulative size of a directory's files in bytes. https://stackoverflow.com/a/55659577/9699829
1677def choose_subaction(*args, **kwargs) -> Any: 1678 """ 1679 Placeholder function to prevent breaking legacy behavior. 1680 See `meerschaum.actions.choose_subaction`. 1681 """ 1682 from meerschaum.actions import choose_subaction as _choose_subactions 1683 return _choose_subactions(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.actions.choose_subaction.
1686def print_options(*args, **kwargs) -> None: 1687 """ 1688 Placeholder function to prevent breaking legacy behavior. 1689 See `meerschaum.utils.formatting.print_options`. 1690 """ 1691 from meerschaum.utils.formatting import print_options as _print_options 1692 return _print_options(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.utils.formatting.print_options.
1803def json_serialize_datetime(dt: datetime) -> Union[str, None]: 1804 """ 1805 Serialize a datetime object into JSON (ISO format string). 1806 1807 Examples 1808 -------- 1809 >>> import json 1810 >>> from datetime import datetime 1811 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 1812 '{"a": "2022-01-01T00:00:00Z"}' 1813 1814 """ 1815 from meerschaum.utils.dtypes import serialize_datetime 1816 return serialize_datetime(dt)
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'