meerschaum.utils.misc
Miscellaneous functions go here
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4""" 5Miscellaneous functions go here 6""" 7 8from __future__ import annotations 9import sys 10from datetime import timedelta, datetime, timezone 11from meerschaum.utils.typing import ( 12 Union, 13 Any, 14 Callable, 15 Optional, 16 List, 17 Dict, 18 SuccessTuple, 19 Iterable, 20 PipesDict, 21 Tuple, 22 InstanceConnector, 23 Hashable, 24 Generator, 25 Iterator, 26 TYPE_CHECKING, 27) 28import meerschaum as mrsm 29if TYPE_CHECKING: 30 import collections 31 32__pdoc__: Dict[str, bool] = { 33 'to_pandas_dtype': False, 34 'filter_unseen_df': False, 35 'add_missing_cols_to_df': False, 36 'parse_df_datetimes': False, 37 'df_from_literal': False, 38 'get_json_cols': False, 39 'get_unhashable_cols': False, 40 'enforce_dtypes': False, 41 'get_datetime_bound_from_df': False, 42 'df_is_chunk_generator': False, 43 'choices_docstring': False, 44 '_get_subaction_names': False, 45} 46 47 48def add_method_to_class( 49 func: Callable[[Any], Any], 50 class_def: 'Class', 51 method_name: Optional[str] = None, 52 keep_self: Optional[bool] = None, 53 ) -> Callable[[Any], Any]: 54 """ 55 Add function `func` to class `class_def`. 56 57 Parameters 58 ---------- 59 func: Callable[[Any], Any] 60 Function to be added as a method of the class 61 62 class_def: Class 63 Class to be modified. 64 65 method_name: Optional[str], default None 66 New name of the method. None will use func.__name__ (default). 67 68 Returns 69 ------- 70 The modified function object. 71 72 """ 73 from functools import wraps 74 75 is_class = isinstance(class_def, type) 76 77 @wraps(func) 78 def wrapper(self, *args, **kw): 79 return func(*args, **kw) 80 81 if method_name is None: 82 method_name = func.__name__ 83 84 setattr(class_def, method_name, ( 85 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 86 ) 87 ) 88 89 return func 90 91 92def generate_password(length: int = 12) -> str: 93 """Generate a secure password of given length. 94 95 Parameters 96 ---------- 97 length : int, default 12 98 The length of the password. 99 100 Returns 101 ------- 102 A random password string. 103 104 """ 105 import secrets, string 106 return ''.join((secrets.choice(string.ascii_letters) for i in range(length))) 107 108def is_int(s : str) -> bool: 109 """ 110 Check if string is an int. 111 112 Parameters 113 ---------- 114 s: str 115 The string to be checked. 116 117 Returns 118 ------- 119 A bool indicating whether the string was able to be cast to an integer. 120 121 """ 122 try: 123 float(s) 124 except Exception as e: 125 return False 126 127 return float(s).is_integer() 128 129 130def string_to_dict( 131 params_string: str 132 ) -> Dict[str, Any]: 133 """ 134 Parse a string into a dictionary. 135 136 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 137 138 Parameters 139 ---------- 140 params_string: str 141 The string to be parsed. 142 143 Returns 144 ------- 145 The parsed dictionary. 146 147 Examples 148 -------- 149 >>> string_to_dict("a:1,b:2") 150 {'a': 1, 'b': 2} 151 >>> string_to_dict('{"a": 1, "b": 2}') 152 {'a': 1, 'b': 2} 153 154 """ 155 if params_string == "": 156 return {} 157 158 import json 159 160 ### Kind of a weird edge case. 161 ### In the generated compose file, there is some weird escaping happening, 162 ### so the string to be parsed starts and ends with a single quote. 163 if ( 164 isinstance(params_string, str) 165 and len(params_string) > 4 166 and params_string[1] == "{" 167 and params_string[-2] == "}" 168 ): 169 return json.loads(params_string[1:-1]) 170 if str(params_string).startswith('{'): 171 return json.loads(params_string) 172 173 import ast 174 params_dict = {} 175 for param in params_string.split(","): 176 _keys = param.split(":") 177 keys = _keys[:-1] 178 try: 179 val = ast.literal_eval(_keys[-1]) 180 except Exception as e: 181 val = str(_keys[-1]) 182 183 c = params_dict 184 for _k in keys[:-1]: 185 try: 186 k = ast.literal_eval(_k) 187 except Exception as e: 188 k = str(_k) 189 if k not in c: 190 c[k] = {} 191 c = c[k] 192 193 c[keys[-1]] = val 194 195 return params_dict 196 197 198def parse_config_substitution( 199 value: str, 200 leading_key: str = 'MRSM', 201 begin_key: str = '{', 202 end_key: str = '}', 203 delimeter: str = ':' 204 ) -> List[Any]: 205 """ 206 Parse Meerschaum substitution syntax 207 E.g. MRSM{value1:value2} => ['value1', 'value2'] 208 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 209 """ 210 if not value.beginswith(leading_key): 211 return value 212 213 return leading_key[len(leading_key):][len():-1].split(delimeter) 214 215 216def edit_file( 217 path: Union[pathlib.Path, str], 218 default_editor: str = 'pyvim', 219 debug: bool = False 220 ) -> bool: 221 """ 222 Open a file for editing. 223 224 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 225 226 Parameters 227 ---------- 228 path: Union[pathlib.Path, str] 229 The path to the file to be edited. 230 231 default_editor: str, default 'pyvim' 232 If `$EDITOR` is not set, use this instead. 233 If `pyvim` is not installed, it will install it from PyPI. 234 235 debug: bool, default False 236 Verbosity toggle. 237 238 Returns 239 ------- 240 A bool indicating the file was successfully edited. 241 """ 242 import os 243 from subprocess import call 244 from meerschaum.utils.debug import dprint 245 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 246 try: 247 EDITOR = os.environ.get('EDITOR', default_editor) 248 if debug: 249 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 250 rc = call([EDITOR, path]) 251 except Exception as e: ### can't open with default editors 252 if debug: 253 dprint(e) 254 dprint('Failed to open file with system editor. Falling back to pyvim...') 255 pyvim = attempt_import('pyvim', lazy=False) 256 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 257 return rc == 0 258 259 260def is_pipe_registered( 261 pipe: mrsm.Pipe, 262 pipes: PipesDict, 263 debug: bool = False 264 ) -> bool: 265 """ 266 Check if a Pipe is inside the pipes dictionary. 267 268 Parameters 269 ---------- 270 pipe: meerschaum.Pipe 271 The pipe to see if it's in the dictionary. 272 273 pipes: PipesDict 274 The dictionary to search inside. 275 276 debug: bool, default False 277 Verbosity toggle. 278 279 Returns 280 ------- 281 A bool indicating whether the pipe is inside the dictionary. 282 """ 283 from meerschaum.utils.debug import dprint 284 ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key 285 if debug: 286 dprint(f'{ck}, {mk}, {lk}') 287 dprint(f'{pipe}, {pipes}') 288 return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk] 289 290 291def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 292 """ 293 Determine the columns and lines in the terminal. 294 If they cannot be determined, return the default values (100 columns and 120 lines). 295 296 Parameters 297 ---------- 298 default_cols: int, default 100 299 If the columns cannot be determined, return this value. 300 301 default_lines: int, default 120 302 If the lines cannot be determined, return this value. 303 304 Returns 305 ------- 306 A tuple if integers for the columns and lines. 307 """ 308 import os 309 try: 310 size = os.get_terminal_size() 311 _cols, _lines = size.columns, size.lines 312 except Exception as e: 313 _cols, _lines = ( 314 int(os.environ.get('COLUMNS', str(default_cols))), 315 int(os.environ.get('LINES', str(default_lines))), 316 ) 317 return _cols, _lines 318 319 320def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 321 """ 322 Iterate over a list in chunks. 323 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 324 325 Parameters 326 ---------- 327 iterable: Iterable[Any] 328 The iterable to iterate over in chunks. 329 330 chunksize: int 331 The size of chunks to iterate with. 332 333 fillvalue: Optional[Any], default None 334 If the chunks do not evenly divide into the iterable, pad the end with this value. 335 336 Returns 337 ------- 338 A generator of tuples of size `chunksize`. 339 340 """ 341 from itertools import zip_longest 342 args = [iter(iterable)] * chunksize 343 return zip_longest(*args, fillvalue=fillvalue) 344 345def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 346 """ 347 Sort a dictionary's values and return a new dictionary. 348 349 Parameters 350 ---------- 351 d: Dict[Any, Any] 352 The dictionary to be sorted. 353 354 Returns 355 ------- 356 A sorted dictionary. 357 358 Examples 359 -------- 360 >>> sorted_dict({'b': 1, 'a': 2}) 361 {'b': 1, 'a': 2} 362 >>> sorted_dict({'b': 2, 'a': 1}) 363 {'a': 1, 'b': 2} 364 365 """ 366 try: 367 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 368 except Exception as e: 369 return d 370 371def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 372 """ 373 Convert the standard pipes dictionary into a list. 374 375 Parameters 376 ---------- 377 pipes_dict: PipesDict 378 The pipes dictionary to be flattened. 379 380 Returns 381 ------- 382 A list of `Pipe` objects. 383 384 """ 385 pipes_list = [] 386 for ck in pipes_dict.values(): 387 for mk in ck.values(): 388 pipes_list += list(mk.values()) 389 return pipes_list 390 391 392def round_time( 393 dt: Optional[datetime] = None, 394 date_delta: Optional[timedelta] = None, 395 to: 'str' = 'down' 396 ) -> datetime: 397 """ 398 Round a datetime object to a multiple of a timedelta. 399 http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python 400 401 NOTE: This function strips timezone information! 402 403 Parameters 404 ---------- 405 dt: Optional[datetime], default None 406 If `None`, grab the current UTC datetime. 407 408 date_delta: Optional[timedelta], default None 409 If `None`, use a delta of 1 minute. 410 411 to: 'str', default 'down' 412 Available options are `'up'`, `'down'`, and `'closest'`. 413 414 Returns 415 ------- 416 A rounded `datetime` object. 417 418 Examples 419 -------- 420 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 421 datetime.datetime(2022, 1, 1, 12, 15) 422 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 423 datetime.datetime(2022, 1, 1, 12, 16) 424 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 425 datetime.datetime(2022, 1, 1, 12, 0) 426 >>> round_time( 427 ... datetime(2022, 1, 1, 12, 15, 57, 200), 428 ... timedelta(hours=1), 429 ... to = 'closest' 430 ... ) 431 datetime.datetime(2022, 1, 1, 12, 0) 432 >>> round_time( 433 ... datetime(2022, 1, 1, 12, 45, 57, 200), 434 ... datetime.timedelta(hours=1), 435 ... to = 'closest' 436 ... ) 437 datetime.datetime(2022, 1, 1, 13, 0) 438 439 """ 440 if date_delta is None: 441 date_delta = timedelta(minutes=1) 442 round_to = date_delta.total_seconds() 443 if dt is None: 444 dt = datetime.now(timezone.utc).replace(tzinfo=None) 445 seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds 446 447 if seconds % round_to == 0 and dt.microsecond == 0: 448 rounding = (seconds + round_to / 2) // round_to * round_to 449 else: 450 if to == 'up': 451 rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to 452 elif to == 'down': 453 rounding = seconds // round_to * round_to 454 else: 455 rounding = (seconds + round_to / 2) // round_to * round_to 456 457 return dt + timedelta(0, rounding - seconds, - dt.microsecond) 458 459 460def timed_input( 461 seconds: int = 10, 462 timeout_message: str = "", 463 prompt: str = "", 464 icon: bool = False, 465 **kw 466 ) -> Union[str, None]: 467 """ 468 Accept user input only for a brief period of time. 469 470 Parameters 471 ---------- 472 seconds: int, default 10 473 The number of seconds to wait. 474 475 timeout_message: str, default '' 476 The message to print after the window has elapsed. 477 478 prompt: str, default '' 479 The prompt to print during the window. 480 481 icon: bool, default False 482 If `True`, print the configured input icon. 483 484 485 Returns 486 ------- 487 The input string entered by the user. 488 489 """ 490 import signal, time 491 492 class TimeoutExpired(Exception): 493 """Raise this exception when the timeout is reached.""" 494 495 def alarm_handler(signum, frame): 496 raise TimeoutExpired 497 498 # set signal handler 499 signal.signal(signal.SIGALRM, alarm_handler) 500 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 501 502 try: 503 return input(prompt) 504 except TimeoutExpired: 505 return None 506 except (EOFError, RuntimeError): 507 try: 508 print(prompt) 509 time.sleep(seconds) 510 except TimeoutExpired: 511 return None 512 finally: 513 signal.alarm(0) # cancel alarm 514 515 516 517 518 519def replace_pipes_in_dict( 520 pipes : Optional[PipesDict] = None, 521 func: 'function' = str, 522 debug: bool = False, 523 **kw 524 ) -> PipesDict: 525 """ 526 Replace the Pipes in a Pipes dict with the result of another function. 527 528 Parameters 529 ---------- 530 pipes: Optional[PipesDict], default None 531 The pipes dict to be processed. 532 533 func: Callable[[Any], Any], default str 534 The function to be applied to every pipe. 535 Defaults to the string constructor. 536 537 debug: bool, default False 538 Verbosity toggle. 539 540 541 Returns 542 ------- 543 A dictionary where every pipe is replaced with the output of a function. 544 545 """ 546 import copy 547 def change_dict(d : Dict[Any, Any], func : 'function') -> None: 548 for k, v in d.items(): 549 if isinstance(v, dict): 550 change_dict(v, func) 551 else: 552 d[k] = func(v) 553 554 if pipes is None: 555 from meerschaum import get_pipes 556 pipes = get_pipes(debug=debug, **kw) 557 558 result = copy.deepcopy(pipes) 559 change_dict(result, func) 560 return result 561 562def enforce_gevent_monkey_patch(): 563 """ 564 Check if gevent monkey patching is enabled, and if not, then apply patching. 565 """ 566 from meerschaum.utils.packages import attempt_import 567 import socket 568 gevent, gevent_socket, gevent_monkey = attempt_import( 569 'gevent', 'gevent.socket', 'gevent.monkey' 570 ) 571 if not socket.socket is gevent_socket.socket: 572 gevent_monkey.patch_all() 573 574def is_valid_email(email: str) -> Union['re.Match', None]: 575 """ 576 Check whether a string is a valid email. 577 578 Parameters 579 ---------- 580 email: str 581 The string to be examined. 582 583 Returns 584 ------- 585 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 586 587 Examples 588 -------- 589 >>> is_valid_email('foo') 590 >>> is_valid_email('foo@foo.com') 591 <re.Match object; span=(0, 11), match='foo@foo.com'> 592 593 """ 594 import re 595 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 596 return re.search(regex, email) 597 598 599def string_width(string: str, widest: bool = True) -> int: 600 """ 601 Calculate the width of a string, either by its widest or last line. 602 603 Parameters 604 ---------- 605 string: str: 606 The string to be examined. 607 608 widest: bool, default True 609 No longer used because `widest` is always assumed to be true. 610 611 Returns 612 ------- 613 An integer for the text's visual width. 614 615 Examples 616 -------- 617 >>> string_width('a') 618 1 619 >>> string_width('a\\nbc\\nd') 620 2 621 622 """ 623 def _widest(): 624 words = string.split('\n') 625 max_length = 0 626 for w in words: 627 length = len(w) 628 if length > max_length: 629 max_length = length 630 return max_length 631 632 return _widest() 633 634def _pyinstaller_traverse_dir( 635 directory: str, 636 ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), 637 include_dotfiles: bool = False 638 ) -> list: 639 """ 640 Recursively traverse a directory and return a list of its contents. 641 """ 642 import os, pathlib 643 paths = [] 644 _directory = pathlib.Path(directory) 645 646 def _found_pattern(name: str): 647 for pattern in ignore_patterns: 648 if pattern.replace('/', os.path.sep) in str(name): 649 return True 650 return False 651 652 for root, dirs, files in os.walk(_directory): 653 _root = str(root)[len(str(_directory.parent)):] 654 if _root.startswith(os.path.sep): 655 _root = _root[len(os.path.sep):] 656 if _root.startswith('.') and not include_dotfiles: 657 continue 658 ### ignore certain patterns 659 if _found_pattern(_root): 660 continue 661 662 for filename in files: 663 if filename.startswith('.') and not include_dotfiles: 664 continue 665 path = os.path.join(root, filename) 666 if _found_pattern(path): 667 continue 668 669 _path = str(path)[len(str(_directory.parent)):] 670 if _path.startswith(os.path.sep): 671 _path = _path[len(os.path.sep):] 672 _path = os.path.sep.join(_path.split(os.path.sep)[:-1]) 673 674 paths.append((path, _path)) 675 return paths 676 677 678def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 679 """ 680 Recursively replace passwords in a dictionary. 681 682 Parameters 683 ---------- 684 d: Dict[str, Any] 685 The dictionary to search through. 686 687 replace_with: str, default '*' 688 The string to replace each character of the password with. 689 690 Returns 691 ------- 692 Another dictionary where values to the keys `'password'` 693 are replaced with `replace_with` (`'*'`). 694 695 Examples 696 -------- 697 >>> replace_password({'a': 1}) 698 {'a': 1} 699 >>> replace_password({'password': '123'}) 700 {'password': '***'} 701 >>> replace_password({'nested': {'password': '123'}}) 702 {'nested': {'password': '***'}} 703 >>> replace_password({'password': '123'}, replace_with='!') 704 {'password': '!!!'} 705 706 """ 707 import copy 708 _d = copy.deepcopy(d) 709 for k, v in d.items(): 710 if isinstance(v, dict): 711 _d[k] = replace_password(v) 712 elif 'password' in str(k).lower(): 713 _d[k] = ''.join([replace_with for char in str(v)]) 714 elif str(k).lower() == 'uri': 715 from meerschaum.connectors.sql import SQLConnector 716 try: 717 uri_params = SQLConnector.parse_uri(v) 718 except Exception as e: 719 uri_params = None 720 if not uri_params: 721 continue 722 if not 'username' in uri_params or not 'password' in uri_params: 723 continue 724 _d[k] = v.replace( 725 uri_params['username'] + ':' + uri_params['password'], 726 uri_params['username'] + ':' + ''.join( 727 [replace_with for char in str(uri_params['password'])] 728 ) 729 ) 730 return _d 731 732 733def filter_arguments( 734 func: Callable[[Any], Any], 735 *args: Any, 736 **kwargs: Any 737) -> Tuple[Tuple[Any], Dict[str, Any]]: 738 """ 739 Filter out unsupported positional and keyword arguments. 740 741 Parameters 742 ---------- 743 func: Callable[[Any], Any] 744 The function to inspect. 745 746 *args: Any 747 Positional arguments to filter and pass to `func`. 748 749 **kwargs 750 Keyword arguments to filter and pass to `func`. 751 752 Returns 753 ------- 754 The `args` and `kwargs` accepted by `func`. 755 """ 756 args = filter_positionals(func, *args) 757 kwargs = filter_keywords(func, **kwargs) 758 return args, kwargs 759 760 761def filter_keywords( 762 func: Callable[[Any], Any], 763 **kw: Any 764) -> Dict[str, Any]: 765 """ 766 Filter out unsupported keyword arguments. 767 768 Parameters 769 ---------- 770 func: Callable[[Any], Any] 771 The function to inspect. 772 773 **kw: Any 774 The arguments to be filtered and passed into `func`. 775 776 Returns 777 ------- 778 A dictionary of keyword arguments accepted by `func`. 779 780 Examples 781 -------- 782 ```python 783 >>> def foo(a=1, b=2): 784 ... return a * b 785 >>> filter_keywords(foo, a=2, b=4, c=6) 786 {'a': 2, 'b': 4} 787 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 788 8 789 ``` 790 791 """ 792 import inspect 793 func_params = inspect.signature(func).parameters 794 ### If the function has a **kw method, skip filtering. 795 for param, _type in func_params.items(): 796 if '**' in str(_type): 797 return kw 798 return {k: v for k, v in kw.items() if k in func_params} 799 800 801def filter_positionals( 802 func: Callable[[Any], Any], 803 *args: Any 804) -> Tuple[Any]: 805 """ 806 Filter out unsupported positional arguments. 807 808 Parameters 809 ---------- 810 func: Callable[[Any], Any] 811 The function to inspect. 812 813 *args: Any 814 The arguments to be filtered and passed into `func`. 815 NOTE: If the function signature expects more arguments than provided, 816 the missing slots will be filled with `None`. 817 818 Returns 819 ------- 820 A tuple of positional arguments accepted by `func`. 821 822 Examples 823 -------- 824 ```python 825 >>> def foo(a, b): 826 ... return a * b 827 >>> filter_positionals(foo, 2, 4, 6) 828 (2, 4) 829 >>> foo(*filter_positionals(foo, 2, 4, 6)) 830 8 831 ``` 832 833 """ 834 import inspect 835 from meerschaum.utils.warnings import warn 836 func_params = inspect.signature(func).parameters 837 acceptable_args: List[Any] = [] 838 839 def _warn_invalids(_num_invalid): 840 if _num_invalid > 0: 841 warn( 842 "Too few arguments were provided. " 843 + f"{_num_invalid} argument" 844 + ('s have ' if _num_invalid != 1 else " has ") 845 + " been filled with `None`.", 846 ) 847 848 num_invalid: int = 0 849 for i, (param, val) in enumerate(func_params.items()): 850 if '=' in str(val) or '*' in str(val): 851 _warn_invalids(num_invalid) 852 return tuple(acceptable_args) 853 854 try: 855 acceptable_args.append(args[i]) 856 except IndexError: 857 acceptable_args.append(None) 858 num_invalid += 1 859 860 _warn_invalids(num_invalid) 861 return tuple(acceptable_args) 862 863 864def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 865 """ 866 Convert an ordered dict to a dict. 867 Does not mutate the original OrderedDict. 868 """ 869 from collections import OrderedDict 870 _d = dict(od) 871 for k, v in od.items(): 872 if isinstance(v, OrderedDict) or ( 873 issubclass(type(v), OrderedDict) 874 ): 875 _d[k] = dict_from_od(v) 876 return _d 877 878def remove_ansi(s : str) -> str: 879 """ 880 Remove ANSI escape characters from a string. 881 882 Parameters 883 ---------- 884 s: str: 885 The string to be cleaned. 886 887 Returns 888 ------- 889 A string with the ANSI characters removed. 890 891 Examples 892 -------- 893 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 894 'Hello, World!' 895 896 """ 897 import re 898 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s) 899 900 901def get_connector_labels( 902 *types: str, 903 search_term: str = '', 904 ignore_exact_match = True, 905 ) -> List[str]: 906 """ 907 Read connector labels from the configuration dictionary. 908 909 Parameters 910 ---------- 911 *types: str 912 The connector types. 913 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 914 915 search_term: str, default '' 916 A filter on the connectors' labels. 917 918 ignore_exact_match: bool, default True 919 If `True`, skip a connector if the search_term is an exact match. 920 921 Returns 922 ------- 923 A list of the keys of defined connectors. 924 925 """ 926 from meerschaum.config import get_config 927 connectors = get_config('meerschaum', 'connectors') 928 929 _types = list(types) 930 if len(_types) == 0: 931 _types = list(connectors.keys()) + ['plugin'] 932 933 conns = [] 934 for t in _types: 935 if t == 'plugin': 936 from meerschaum.plugins import get_data_plugins 937 conns += [ 938 f'{t}:' + plugin.module.__name__.split('.')[-1] 939 for plugin in get_data_plugins() 940 ] 941 continue 942 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 943 944 possibilities = [ 945 c for c in conns 946 if c.startswith(search_term) 947 and c != ( 948 search_term if ignore_exact_match else '' 949 ) 950 ] 951 return sorted(possibilities) 952 953 954def json_serialize_datetime(dt: datetime) -> Union[str, None]: 955 """ 956 Serialize a datetime object into JSON (ISO format string). 957 958 Examples 959 -------- 960 >>> import json 961 >>> from datetime import datetime 962 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 963 '{"a": "2022-01-01T00:00:00Z"}' 964 965 """ 966 if not isinstance(dt, datetime): 967 return None 968 tz_suffix = 'Z' if dt.tzinfo is None else '' 969 return dt.isoformat() + tz_suffix 970 971 972def wget( 973 url: str, 974 dest: Optional[Union[str, 'pathlib.Path']] = None, 975 headers: Optional[Dict[str, Any]] = None, 976 color: bool = True, 977 debug: bool = False, 978 **kw: Any 979 ) -> 'pathlib.Path': 980 """ 981 Mimic `wget` with `requests`. 982 983 Parameters 984 ---------- 985 url: str 986 The URL to the resource to be downloaded. 987 988 dest: Optional[Union[str, pathlib.Path]], default None 989 The destination path of the downloaded file. 990 If `None`, save to the current directory. 991 992 color: bool, default True 993 If `debug` is `True`, print color output. 994 995 debug: bool, default False 996 Verbosity toggle. 997 998 Returns 999 ------- 1000 The path to the downloaded file. 1001 1002 """ 1003 from meerschaum.utils.warnings import warn, error 1004 from meerschaum.utils.debug import dprint 1005 import os, pathlib, re, urllib.request 1006 if headers is None: 1007 headers = {} 1008 request = urllib.request.Request(url, headers=headers) 1009 if not color: 1010 dprint = print 1011 if debug: 1012 dprint(f"Downloading from '{url}'...") 1013 try: 1014 response = urllib.request.urlopen(request) 1015 except Exception as e: 1016 import ssl 1017 ssl._create_default_https_context = ssl._create_unverified_context 1018 try: 1019 response = urllib.request.urlopen(request) 1020 except Exception as _e: 1021 print(_e) 1022 response = None 1023 if response is None or response.code != 200: 1024 error_msg = f"Failed to download from '{url}'." 1025 if color: 1026 error(error_msg) 1027 else: 1028 print(error_msg) 1029 import sys 1030 sys.exit(1) 1031 1032 d = response.headers.get('content-disposition', None) 1033 fname = ( 1034 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1035 else url.split('/')[-1] 1036 ) 1037 1038 if dest is None: 1039 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1040 elif isinstance(dest, str): 1041 dest = pathlib.Path(dest) 1042 1043 with open(dest, 'wb') as f: 1044 f.write(response.fp.read()) 1045 1046 if debug: 1047 dprint(f"Downloaded file '{dest}'.") 1048 1049 return dest 1050 1051 1052def async_wrap(func): 1053 """ 1054 Run a synchronous function as async. 1055 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1056 """ 1057 import asyncio 1058 from functools import wraps, partial 1059 1060 @wraps(func) 1061 async def run(*args, loop=None, executor=None, **kwargs): 1062 if loop is None: 1063 loop = asyncio.get_event_loop() 1064 pfunc = partial(func, *args, **kwargs) 1065 return await loop.run_in_executor(executor, pfunc) 1066 return run 1067 1068 1069def debug_trace(browser: bool = True): 1070 """ 1071 Open a web-based debugger to trace the execution of the program. 1072 1073 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1074 """ 1075 from meerschaum.utils.debug import trace 1076 trace(browser=browser) 1077 1078 1079def items_str( 1080 items: List[Any], 1081 quotes: bool = True, 1082 quote_str: str = "'", 1083 commas: bool = True, 1084 comma_str: str = ',', 1085 and_: bool = True, 1086 and_str: str = 'and', 1087 oxford_comma: bool = True, 1088 spaces: bool = True, 1089 space_str = ' ', 1090 ) -> str: 1091 """ 1092 Return a formatted string if list items separated by commas. 1093 1094 Parameters 1095 ---------- 1096 items: [List[Any]] 1097 The items to be printed as an English list. 1098 1099 quotes: bool, default True 1100 If `True`, wrap items in quotes. 1101 1102 quote_str: str, default "'" 1103 If `quotes` is `True`, prepend and append each item with this string. 1104 1105 and_: bool, default True 1106 If `True`, include the word 'and' before the final item in the list. 1107 1108 and_str: str, default 'and' 1109 If `and_` is True, insert this string where 'and' normally would in and English list. 1110 1111 oxford_comma: bool, default True 1112 If `True`, include the Oxford Comma (comma before the final 'and'). 1113 Only applies when `and_` is `True`. 1114 1115 spaces: bool, default True 1116 If `True`, separate items with `space_str` 1117 1118 space_str: str, default ' ' 1119 If `spaces` is `True`, separate items with this string. 1120 1121 Returns 1122 ------- 1123 A string of the items as an English list. 1124 1125 Examples 1126 -------- 1127 >>> items_str([1,2,3]) 1128 "'1', '2', and '3'" 1129 >>> items_str([1,2,3], quotes=False) 1130 '1, 2, and 3' 1131 >>> items_str([1,2,3], and_=False) 1132 "'1', '2', '3'" 1133 >>> items_str([1,2,3], spaces=False, and_=False) 1134 "'1','2','3'" 1135 >>> items_str([1,2,3], oxford_comma=False) 1136 "'1', '2' and '3'" 1137 >>> items_str([1,2,3], quote_str=":") 1138 ':1:, :2:, and :3:' 1139 >>> items_str([1,2,3], and_str="or") 1140 "'1', '2', or '3'" 1141 >>> items_str([1,2,3], space_str="_") 1142 "'1',_'2',_and_'3'" 1143 1144 """ 1145 if not items: 1146 return '' 1147 1148 q = quote_str if quotes else '' 1149 s = space_str if spaces else '' 1150 a = and_str if and_ else '' 1151 c = comma_str if commas else '' 1152 1153 if len(items) == 1: 1154 return q + str(list(items)[0]) + q 1155 1156 if len(items) == 2: 1157 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1158 1159 sep = q + c + s + q 1160 output = q + sep.join(str(i) for i in items[:-1]) + q 1161 if oxford_comma: 1162 output += c 1163 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1164 return output 1165 1166 1167def interval_str(delta: Union[timedelta, int]) -> str: 1168 """ 1169 Return a human-readable string for a `timedelta` (or `int` minutes). 1170 1171 Parameters 1172 ---------- 1173 delta: Union[timedelta, int] 1174 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1175 1176 Returns 1177 ------- 1178 A formatted string, fit for human eyes. 1179 """ 1180 from meerschaum.utils.packages import attempt_import 1181 humanfriendly = attempt_import('humanfriendly') 1182 delta_seconds = ( 1183 delta.total_seconds() 1184 if isinstance(delta, timedelta) 1185 else (delta * 60) 1186 ) 1187 return humanfriendly.format_timespan(delta_seconds) 1188 1189 1190def is_docker_available() -> bool: 1191 """Check if we can connect to the Docker engine.""" 1192 import subprocess 1193 try: 1194 has_docker = subprocess.call( 1195 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1196 ) == 0 1197 except Exception as e: 1198 has_docker = False 1199 return has_docker 1200 1201 1202def is_android() -> bool: 1203 """Return `True` if the current platform is Android.""" 1204 import sys 1205 return hasattr(sys, 'getandroidapilevel') 1206 1207 1208def is_bcp_available() -> bool: 1209 """Check if the MSSQL `bcp` utility is installed.""" 1210 import subprocess 1211 1212 try: 1213 has_bcp = subprocess.call( 1214 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1215 ) == 0 1216 except Exception as e: 1217 has_bcp = False 1218 return has_bcp 1219 1220 1221def get_last_n_lines(file_name: str, N: int): 1222 """ 1223 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1224 """ 1225 import os 1226 # Create an empty list to keep the track of last N lines 1227 list_of_lines = [] 1228 # Open file for reading in binary mode 1229 with open(file_name, 'rb') as read_obj: 1230 # Move the cursor to the end of the file 1231 read_obj.seek(0, os.SEEK_END) 1232 # Create a buffer to keep the last read line 1233 buffer = bytearray() 1234 # Get the current position of pointer i.e eof 1235 pointer_location = read_obj.tell() 1236 # Loop till pointer reaches the top of the file 1237 while pointer_location >= 0: 1238 # Move the file pointer to the location pointed by pointer_location 1239 read_obj.seek(pointer_location) 1240 # Shift pointer location by -1 1241 pointer_location = pointer_location -1 1242 # read that byte / character 1243 new_byte = read_obj.read(1) 1244 # If the read byte is new line character then it means one line is read 1245 if new_byte == b'\n': 1246 # Save the line in list of lines 1247 list_of_lines.append(buffer.decode()[::-1]) 1248 # If the size of list reaches N, then return the reversed list 1249 if len(list_of_lines) == N: 1250 return list(reversed(list_of_lines)) 1251 # Reinitialize the byte array to save next line 1252 buffer = bytearray() 1253 else: 1254 # If last read character is not eol then add it in buffer 1255 buffer.extend(new_byte) 1256 # As file is read completely, if there is still data in buffer, then its first line. 1257 if len(buffer) > 0: 1258 list_of_lines.append(buffer.decode()[::-1]) 1259 # return the reversed list 1260 return list(reversed(list_of_lines)) 1261 1262 1263def tail(f, n, offset=None): 1264 """ 1265 https://stackoverflow.com/a/692616/9699829 1266 1267 Reads n lines from f with an offset of offset lines. The return 1268 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1269 an indicator that is `True` if there are more lines in the file. 1270 """ 1271 avg_line_length = 74 1272 to_read = n + (offset or 0) 1273 1274 while True: 1275 try: 1276 f.seek(-(avg_line_length * to_read), 2) 1277 except IOError: 1278 # woops. apparently file is smaller than what we want 1279 # to step back, go to the beginning instead 1280 f.seek(0) 1281 pos = f.tell() 1282 lines = f.read().splitlines() 1283 if len(lines) >= to_read or pos == 0: 1284 return lines[-to_read:offset and -offset or None], \ 1285 len(lines) > to_read or pos > 0 1286 avg_line_length *= 1.3 1287 1288 1289def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1290 """ 1291 Remove characters from each section of a string until the length is within the limit. 1292 1293 Parameters 1294 ---------- 1295 item: str 1296 The item name to be truncated. 1297 1298 delimeter: str, default '_' 1299 Split `item` by this string into several sections. 1300 1301 max_len: int, default 128 1302 The max acceptable length of the truncated version of `item`. 1303 1304 Returns 1305 ------- 1306 The truncated string. 1307 1308 Examples 1309 -------- 1310 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1311 'ab_de_gh' 1312 1313 """ 1314 if len(item) < max_len: 1315 return item 1316 1317 def _shorten(s: str) -> str: 1318 return s[:-1] if len(s) > 1 else s 1319 1320 sections = list(enumerate(item.split('_'))) 1321 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1322 available_chars = max_len - len(sections) 1323 1324 _sections = [(i, s) for i, s in sorted_sections] 1325 _sections_len = sum([len(s) for i, s in _sections]) 1326 _old_sections_len = _sections_len 1327 while _sections_len > available_chars: 1328 _sections = [(i, _shorten(s)) for i, s in _sections] 1329 _old_sections_len = _sections_len 1330 _sections_len = sum([len(s) for i, s in _sections]) 1331 if _old_sections_len == _sections_len: 1332 raise Exception(f"String could not be truncated: '{item}'") 1333 1334 new_sections = sorted(_sections, key=lambda x: x[0]) 1335 return delimeter.join([s for i, s in new_sections]) 1336 1337 1338def separate_negation_values( 1339 vals: Union[List[str], Tuple[str]], 1340 negation_prefix: Optional[str] = None, 1341 ) -> Tuple[List[str], List[str]]: 1342 """ 1343 Separate the negated values from the positive ones. 1344 Return two lists: positive and negative values. 1345 1346 Parameters 1347 ---------- 1348 vals: Union[List[str], Tuple[str]] 1349 A list of strings to parse. 1350 1351 negation_prefix: Optional[str], default None 1352 Include values that start with this string in the second list. 1353 If `None`, use the system default (`_`). 1354 """ 1355 if negation_prefix is None: 1356 from meerschaum.config.static import STATIC_CONFIG 1357 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1358 _in_vals, _ex_vals = [], [] 1359 for v in vals: 1360 if str(v).startswith(negation_prefix): 1361 _ex_vals.append(str(v)[len(negation_prefix):]) 1362 else: 1363 _in_vals.append(v) 1364 1365 return _in_vals, _ex_vals 1366 1367 1368def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1369 """ 1370 Translate a params dictionary into lists of include- and exclude-values. 1371 1372 Parameters 1373 ---------- 1374 params: Optional[Dict[str, Any]] 1375 A params query dictionary. 1376 1377 Returns 1378 ------- 1379 A dictionary mapping keys to a tuple of lists for include and exclude values. 1380 1381 Examples 1382 -------- 1383 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1384 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1385 """ 1386 if not params: 1387 return {} 1388 return { 1389 col: separate_negation_values( 1390 ( 1391 val 1392 if isinstance(val, (list, tuple)) 1393 else [val] 1394 ) 1395 ) 1396 for col, val in params.items() 1397 } 1398 1399 1400def flatten_list(list_: List[Any]) -> List[Any]: 1401 """ 1402 Recursively flatten a list. 1403 """ 1404 for item in list_: 1405 if isinstance(item, list): 1406 yield from flatten_list(item) 1407 else: 1408 yield item 1409 1410 1411def make_symlink(src_path: pathlib.Path, dest_path: pathlib.Path) -> SuccessTuple: 1412 """ 1413 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1414 1415 Parameters 1416 ---------- 1417 src_path: pathlib.Path 1418 The source path. 1419 1420 dest_path: pathlib.Path 1421 The destination path. 1422 1423 Returns 1424 ------- 1425 A SuccessTuple indicating success. 1426 """ 1427 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1428 return True, "Symlink already exists." 1429 try: 1430 dest_path.symlink_to(src_path) 1431 success = True 1432 except Exception as e: 1433 success = False 1434 msg = str(e) 1435 if success: 1436 return success, "Success" 1437 1438 ### Failed to create a symlink. 1439 ### If we're not on Windows, return an error. 1440 import platform 1441 if platform.system() != 'Windows': 1442 return success, msg 1443 1444 try: 1445 import _winapi 1446 except ImportError: 1447 return False, "Unable to import _winapi." 1448 1449 if src_path.is_dir(): 1450 try: 1451 _winapi.CreateJunction(str(src_path), str(dest_path)) 1452 except Exception as e: 1453 return False, str(e) 1454 return True, "Success" 1455 1456 ### Last resort: copy the file on Windows. 1457 import shutil 1458 try: 1459 shutil.copy(src_path, dest_path) 1460 except Exception as e: 1461 return False, str(e) 1462 1463 return True, "Success" 1464 1465 1466def is_symlink(path: pathlib.Path) -> bool: 1467 """ 1468 Wrap `path.is_symlink()` but add support for Windows junctions. 1469 """ 1470 if path.is_symlink(): 1471 return True 1472 import platform, os 1473 if platform.system() != 'Windows': 1474 return False 1475 try: 1476 return bool(os.readlink(path)) 1477 except OSError: 1478 return False 1479 1480 1481def parametrized(dec): 1482 """ 1483 A meta-decorator for allowing other decorator functions to have parameters. 1484 1485 https://stackoverflow.com/a/26151604/9699829 1486 """ 1487 def layer(*args, **kwargs): 1488 def repl(f): 1489 return dec(f, *args, **kwargs) 1490 return repl 1491 return layer 1492 1493 1494def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1495 """ 1496 Safely extract a TAR file to a give directory. 1497 This defends against CVE-2007-4559. 1498 1499 Parameters 1500 ---------- 1501 tarf: file 1502 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1503 1504 output_dir: Union[str, pathlib.Path] 1505 The output directory. 1506 """ 1507 import os 1508 1509 def is_within_directory(directory, target): 1510 abs_directory = os.path.abspath(directory) 1511 abs_target = os.path.abspath(target) 1512 prefix = os.path.commonprefix([abs_directory, abs_target]) 1513 return prefix == abs_directory 1514 1515 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1516 for member in tar.getmembers(): 1517 member_path = os.path.join(path, member.name) 1518 if not is_within_directory(path, member_path): 1519 raise Exception("Attempted Path Traversal in Tar File") 1520 1521 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1522 1523 return safe_extract(tarf, output_dir) 1524 1525################## 1526# Legacy imports # 1527################## 1528 1529def choose_subaction(*args, **kwargs) -> Any: 1530 """ 1531 Placeholder function to prevent breaking legacy behavior. 1532 See `meerschaum.actions.choose_subaction`. 1533 """ 1534 from meerschaum.actions import choose_subaction as _choose_subactions 1535 return _choose_subactions(*args, **kwargs) 1536 1537 1538def print_options(*args, **kwargs) -> None: 1539 """ 1540 Placeholder function to prevent breaking legacy behavior. 1541 See `meerschaum.utils.formatting.print_options`. 1542 """ 1543 from meerschaum.utils.formatting import print_options as _print_options 1544 return _print_options(*args, **kwargs) 1545 1546 1547def to_pandas_dtype(*args, **kwargs) -> Any: 1548 """ 1549 Placeholder function to prevent breaking legacy behavior. 1550 See `meerschaum.utils.dtypes.to_pandas_dtype`. 1551 """ 1552 from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype 1553 return _to_pandas_dtype(*args, **kwargs) 1554 1555 1556def filter_unseen_df(*args, **kwargs) -> Any: 1557 """ 1558 Placeholder function to prevent breaking legacy behavior. 1559 See `meerschaum.utils.dataframe.filter_unseen_df`. 1560 """ 1561 from meerschaum.utils.dataframe import filter_unseen_df as real_function 1562 return real_function(*args, **kwargs) 1563 1564 1565def add_missing_cols_to_df(*args, **kwargs) -> Any: 1566 """ 1567 Placeholder function to prevent breaking legacy behavior. 1568 See `meerschaum.utils.dataframe.add_missing_cols_to_df`. 1569 """ 1570 from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function 1571 return real_function(*args, **kwargs) 1572 1573 1574def parse_df_datetimes(*args, **kwargs) -> Any: 1575 """ 1576 Placeholder function to prevent breaking legacy behavior. 1577 See `meerschaum.utils.dataframe.parse_df_datetimes`. 1578 """ 1579 from meerschaum.utils.dataframe import parse_df_datetimes as real_function 1580 return real_function(*args, **kwargs) 1581 1582 1583def df_from_literal(*args, **kwargs) -> Any: 1584 """ 1585 Placeholder function to prevent breaking legacy behavior. 1586 See `meerschaum.utils.dataframe.df_from_literal`. 1587 """ 1588 from meerschaum.utils.dataframe import df_from_literal as real_function 1589 return real_function(*args, **kwargs) 1590 1591 1592def get_json_cols(*args, **kwargs) -> Any: 1593 """ 1594 Placeholder function to prevent breaking legacy behavior. 1595 See `meerschaum.utils.dataframe.get_json_cols`. 1596 """ 1597 from meerschaum.utils.dataframe import get_json_cols as real_function 1598 return real_function(*args, **kwargs) 1599 1600 1601def get_unhashable_cols(*args, **kwargs) -> Any: 1602 """ 1603 Placeholder function to prevent breaking legacy behavior. 1604 See `meerschaum.utils.dataframe.get_unhashable_cols`. 1605 """ 1606 from meerschaum.utils.dataframe import get_unhashable_cols as real_function 1607 return real_function(*args, **kwargs) 1608 1609 1610def enforce_dtypes(*args, **kwargs) -> Any: 1611 """ 1612 Placeholder function to prevent breaking legacy behavior. 1613 See `meerschaum.utils.dataframe.enforce_dtypes`. 1614 """ 1615 from meerschaum.utils.dataframe import enforce_dtypes as real_function 1616 return real_function(*args, **kwargs) 1617 1618 1619def get_datetime_bound_from_df(*args, **kwargs) -> Any: 1620 """ 1621 Placeholder function to prevent breaking legacy behavior. 1622 See `meerschaum.utils.dataframe.get_datetime_bound_from_df`. 1623 """ 1624 from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function 1625 return real_function(*args, **kwargs) 1626 1627 1628def df_is_chunk_generator(*args, **kwargs) -> Any: 1629 """ 1630 Placeholder function to prevent breaking legacy behavior. 1631 See `meerschaum.utils.dataframe.df_is_chunk_generator`. 1632 """ 1633 from meerschaum.utils.dataframe import df_is_chunk_generator as real_function 1634 return real_function(*args, **kwargs) 1635 1636 1637def choices_docstring(*args, **kwargs) -> Any: 1638 """ 1639 Placeholder function to prevent breaking legacy behavior. 1640 See `meerschaum.actions.choices_docstring`. 1641 """ 1642 from meerschaum.actions import choices_docstring as real_function 1643 return real_function(*args, **kwargs) 1644 1645 1646def _get_subaction_names(*args, **kwargs) -> Any: 1647 """ 1648 Placeholder function to prevent breaking legacy behavior. 1649 See `meerschaum.actions._get_subaction_names`. 1650 """ 1651 from meerschaum.actions import _get_subaction_names as real_function 1652 return real_function(*args, **kwargs) 1653 1654 1655_current_module = sys.modules[__name__] 1656__all__ = tuple( 1657 name 1658 for name, obj in globals().items() 1659 if callable(obj) 1660 and name not in __pdoc__ 1661 and getattr(obj, '__module__', None) == _current_module.__name__ 1662)
49def add_method_to_class( 50 func: Callable[[Any], Any], 51 class_def: 'Class', 52 method_name: Optional[str] = None, 53 keep_self: Optional[bool] = None, 54 ) -> Callable[[Any], Any]: 55 """ 56 Add function `func` to class `class_def`. 57 58 Parameters 59 ---------- 60 func: Callable[[Any], Any] 61 Function to be added as a method of the class 62 63 class_def: Class 64 Class to be modified. 65 66 method_name: Optional[str], default None 67 New name of the method. None will use func.__name__ (default). 68 69 Returns 70 ------- 71 The modified function object. 72 73 """ 74 from functools import wraps 75 76 is_class = isinstance(class_def, type) 77 78 @wraps(func) 79 def wrapper(self, *args, **kw): 80 return func(*args, **kw) 81 82 if method_name is None: 83 method_name = func.__name__ 84 85 setattr(class_def, method_name, ( 86 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 87 ) 88 ) 89 90 return func
Add function func
to class class_def
.
Parameters
- func (Callable[[Any], Any]): Function to be added as a method of the class
- class_def (Class): Class to be modified.
- method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
- The modified function object.
93def generate_password(length: int = 12) -> str: 94 """Generate a secure password of given length. 95 96 Parameters 97 ---------- 98 length : int, default 12 99 The length of the password. 100 101 Returns 102 ------- 103 A random password string. 104 105 """ 106 import secrets, string 107 return ''.join((secrets.choice(string.ascii_letters) for i in range(length)))
Generate a secure password of given length.
Parameters
- length (int, default 12): The length of the password.
Returns
- A random password string.
109def is_int(s : str) -> bool: 110 """ 111 Check if string is an int. 112 113 Parameters 114 ---------- 115 s: str 116 The string to be checked. 117 118 Returns 119 ------- 120 A bool indicating whether the string was able to be cast to an integer. 121 122 """ 123 try: 124 float(s) 125 except Exception as e: 126 return False 127 128 return float(s).is_integer()
Check if string is an int.
Parameters
- s (str): The string to be checked.
Returns
- A bool indicating whether the string was able to be cast to an integer.
131def string_to_dict( 132 params_string: str 133 ) -> Dict[str, Any]: 134 """ 135 Parse a string into a dictionary. 136 137 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 138 139 Parameters 140 ---------- 141 params_string: str 142 The string to be parsed. 143 144 Returns 145 ------- 146 The parsed dictionary. 147 148 Examples 149 -------- 150 >>> string_to_dict("a:1,b:2") 151 {'a': 1, 'b': 2} 152 >>> string_to_dict('{"a": 1, "b": 2}') 153 {'a': 1, 'b': 2} 154 155 """ 156 if params_string == "": 157 return {} 158 159 import json 160 161 ### Kind of a weird edge case. 162 ### In the generated compose file, there is some weird escaping happening, 163 ### so the string to be parsed starts and ends with a single quote. 164 if ( 165 isinstance(params_string, str) 166 and len(params_string) > 4 167 and params_string[1] == "{" 168 and params_string[-2] == "}" 169 ): 170 return json.loads(params_string[1:-1]) 171 if str(params_string).startswith('{'): 172 return json.loads(params_string) 173 174 import ast 175 params_dict = {} 176 for param in params_string.split(","): 177 _keys = param.split(":") 178 keys = _keys[:-1] 179 try: 180 val = ast.literal_eval(_keys[-1]) 181 except Exception as e: 182 val = str(_keys[-1]) 183 184 c = params_dict 185 for _k in keys[:-1]: 186 try: 187 k = ast.literal_eval(_k) 188 except Exception as e: 189 k = str(_k) 190 if k not in c: 191 c[k] = {} 192 c = c[k] 193 194 c[keys[-1]] = val 195 196 return params_dict
Parse a string into a dictionary.
If the string begins with '{', parse as JSON. Otherwise use simple parsing.
Parameters
- params_string (str): The string to be parsed.
Returns
- The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2")
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
199def parse_config_substitution( 200 value: str, 201 leading_key: str = 'MRSM', 202 begin_key: str = '{', 203 end_key: str = '}', 204 delimeter: str = ':' 205 ) -> List[Any]: 206 """ 207 Parse Meerschaum substitution syntax 208 E.g. MRSM{value1:value2} => ['value1', 'value2'] 209 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 210 """ 211 if not value.beginswith(leading_key): 212 return value 213 214 return leading_key[len(leading_key):][len():-1].split(delimeter)
Parse Meerschaum substitution syntax
E.g. MRSM{value1:value2} => ['value1', 'value2']
NOTE: Not currently used. See search_and_substitute_config
in meerschaum.config._read_yaml
.
217def edit_file( 218 path: Union[pathlib.Path, str], 219 default_editor: str = 'pyvim', 220 debug: bool = False 221 ) -> bool: 222 """ 223 Open a file for editing. 224 225 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 226 227 Parameters 228 ---------- 229 path: Union[pathlib.Path, str] 230 The path to the file to be edited. 231 232 default_editor: str, default 'pyvim' 233 If `$EDITOR` is not set, use this instead. 234 If `pyvim` is not installed, it will install it from PyPI. 235 236 debug: bool, default False 237 Verbosity toggle. 238 239 Returns 240 ------- 241 A bool indicating the file was successfully edited. 242 """ 243 import os 244 from subprocess import call 245 from meerschaum.utils.debug import dprint 246 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 247 try: 248 EDITOR = os.environ.get('EDITOR', default_editor) 249 if debug: 250 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 251 rc = call([EDITOR, path]) 252 except Exception as e: ### can't open with default editors 253 if debug: 254 dprint(e) 255 dprint('Failed to open file with system editor. Falling back to pyvim...') 256 pyvim = attempt_import('pyvim', lazy=False) 257 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 258 return rc == 0
Open a file for editing.
Attempt to launch the user's defined $EDITOR
, otherwise use pyvim
.
Parameters
- path (Union[pathlib.Path, str]): The path to the file to be edited.
- default_editor (str, default 'pyvim'):
If
$EDITOR
is not set, use this instead. Ifpyvim
is not installed, it will install it from PyPI. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating the file was successfully edited.
261def is_pipe_registered( 262 pipe: mrsm.Pipe, 263 pipes: PipesDict, 264 debug: bool = False 265 ) -> bool: 266 """ 267 Check if a Pipe is inside the pipes dictionary. 268 269 Parameters 270 ---------- 271 pipe: meerschaum.Pipe 272 The pipe to see if it's in the dictionary. 273 274 pipes: PipesDict 275 The dictionary to search inside. 276 277 debug: bool, default False 278 Verbosity toggle. 279 280 Returns 281 ------- 282 A bool indicating whether the pipe is inside the dictionary. 283 """ 284 from meerschaum.utils.debug import dprint 285 ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key 286 if debug: 287 dprint(f'{ck}, {mk}, {lk}') 288 dprint(f'{pipe}, {pipes}') 289 return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]
Check if a Pipe is inside the pipes dictionary.
Parameters
- pipe (meerschaum.Pipe): The pipe to see if it's in the dictionary.
- pipes (PipesDict): The dictionary to search inside.
- debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating whether the pipe is inside the dictionary.
292def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 293 """ 294 Determine the columns and lines in the terminal. 295 If they cannot be determined, return the default values (100 columns and 120 lines). 296 297 Parameters 298 ---------- 299 default_cols: int, default 100 300 If the columns cannot be determined, return this value. 301 302 default_lines: int, default 120 303 If the lines cannot be determined, return this value. 304 305 Returns 306 ------- 307 A tuple if integers for the columns and lines. 308 """ 309 import os 310 try: 311 size = os.get_terminal_size() 312 _cols, _lines = size.columns, size.lines 313 except Exception as e: 314 _cols, _lines = ( 315 int(os.environ.get('COLUMNS', str(default_cols))), 316 int(os.environ.get('LINES', str(default_lines))), 317 ) 318 return _cols, _lines
Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).
Parameters
- default_cols (int, default 100): If the columns cannot be determined, return this value.
- default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
- A tuple if integers for the columns and lines.
321def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 322 """ 323 Iterate over a list in chunks. 324 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 325 326 Parameters 327 ---------- 328 iterable: Iterable[Any] 329 The iterable to iterate over in chunks. 330 331 chunksize: int 332 The size of chunks to iterate with. 333 334 fillvalue: Optional[Any], default None 335 If the chunks do not evenly divide into the iterable, pad the end with this value. 336 337 Returns 338 ------- 339 A generator of tuples of size `chunksize`. 340 341 """ 342 from itertools import zip_longest 343 args = [iter(iterable)] * chunksize 344 return zip_longest(*args, fillvalue=fillvalue)
Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
Parameters
- iterable (Iterable[Any]): The iterable to iterate over in chunks.
- chunksize (int): The size of chunks to iterate with.
- fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
- A generator of tuples of size
chunksize
.
346def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 347 """ 348 Sort a dictionary's values and return a new dictionary. 349 350 Parameters 351 ---------- 352 d: Dict[Any, Any] 353 The dictionary to be sorted. 354 355 Returns 356 ------- 357 A sorted dictionary. 358 359 Examples 360 -------- 361 >>> sorted_dict({'b': 1, 'a': 2}) 362 {'b': 1, 'a': 2} 363 >>> sorted_dict({'b': 2, 'a': 1}) 364 {'a': 1, 'b': 2} 365 366 """ 367 try: 368 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 369 except Exception as e: 370 return d
Sort a dictionary's values and return a new dictionary.
Parameters
- d (Dict[Any, Any]): The dictionary to be sorted.
Returns
- A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
372def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 373 """ 374 Convert the standard pipes dictionary into a list. 375 376 Parameters 377 ---------- 378 pipes_dict: PipesDict 379 The pipes dictionary to be flattened. 380 381 Returns 382 ------- 383 A list of `Pipe` objects. 384 385 """ 386 pipes_list = [] 387 for ck in pipes_dict.values(): 388 for mk in ck.values(): 389 pipes_list += list(mk.values()) 390 return pipes_list
Convert the standard pipes dictionary into a list.
Parameters
- pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
- A list of
Pipe
objects.
393def round_time( 394 dt: Optional[datetime] = None, 395 date_delta: Optional[timedelta] = None, 396 to: 'str' = 'down' 397 ) -> datetime: 398 """ 399 Round a datetime object to a multiple of a timedelta. 400 http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python 401 402 NOTE: This function strips timezone information! 403 404 Parameters 405 ---------- 406 dt: Optional[datetime], default None 407 If `None`, grab the current UTC datetime. 408 409 date_delta: Optional[timedelta], default None 410 If `None`, use a delta of 1 minute. 411 412 to: 'str', default 'down' 413 Available options are `'up'`, `'down'`, and `'closest'`. 414 415 Returns 416 ------- 417 A rounded `datetime` object. 418 419 Examples 420 -------- 421 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 422 datetime.datetime(2022, 1, 1, 12, 15) 423 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 424 datetime.datetime(2022, 1, 1, 12, 16) 425 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 426 datetime.datetime(2022, 1, 1, 12, 0) 427 >>> round_time( 428 ... datetime(2022, 1, 1, 12, 15, 57, 200), 429 ... timedelta(hours=1), 430 ... to = 'closest' 431 ... ) 432 datetime.datetime(2022, 1, 1, 12, 0) 433 >>> round_time( 434 ... datetime(2022, 1, 1, 12, 45, 57, 200), 435 ... datetime.timedelta(hours=1), 436 ... to = 'closest' 437 ... ) 438 datetime.datetime(2022, 1, 1, 13, 0) 439 440 """ 441 if date_delta is None: 442 date_delta = timedelta(minutes=1) 443 round_to = date_delta.total_seconds() 444 if dt is None: 445 dt = datetime.now(timezone.utc).replace(tzinfo=None) 446 seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds 447 448 if seconds % round_to == 0 and dt.microsecond == 0: 449 rounding = (seconds + round_to / 2) // round_to * round_to 450 else: 451 if to == 'up': 452 rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to 453 elif to == 'down': 454 rounding = seconds // round_to * round_to 455 else: 456 rounding = (seconds + round_to / 2) // round_to * round_to 457 458 return dt + timedelta(0, rounding - seconds, - dt.microsecond)
Round a datetime object to a multiple of a timedelta. http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
NOTE: This function strips timezone information!
Parameters
- dt (Optional[datetime], default None):
If
None
, grab the current UTC datetime. - date_delta (Optional[timedelta], default None):
If
None
, use a delta of 1 minute. - to ('str', default 'down'):
Available options are
'up'
,'down'
, and'closest'
.
Returns
- A rounded
datetime
object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 15, 57, 200),
... timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 45, 57, 200),
... datetime.timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)
461def timed_input( 462 seconds: int = 10, 463 timeout_message: str = "", 464 prompt: str = "", 465 icon: bool = False, 466 **kw 467 ) -> Union[str, None]: 468 """ 469 Accept user input only for a brief period of time. 470 471 Parameters 472 ---------- 473 seconds: int, default 10 474 The number of seconds to wait. 475 476 timeout_message: str, default '' 477 The message to print after the window has elapsed. 478 479 prompt: str, default '' 480 The prompt to print during the window. 481 482 icon: bool, default False 483 If `True`, print the configured input icon. 484 485 486 Returns 487 ------- 488 The input string entered by the user. 489 490 """ 491 import signal, time 492 493 class TimeoutExpired(Exception): 494 """Raise this exception when the timeout is reached.""" 495 496 def alarm_handler(signum, frame): 497 raise TimeoutExpired 498 499 # set signal handler 500 signal.signal(signal.SIGALRM, alarm_handler) 501 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 502 503 try: 504 return input(prompt) 505 except TimeoutExpired: 506 return None 507 except (EOFError, RuntimeError): 508 try: 509 print(prompt) 510 time.sleep(seconds) 511 except TimeoutExpired: 512 return None 513 finally: 514 signal.alarm(0) # cancel alarm
Accept user input only for a brief period of time.
Parameters
- seconds (int, default 10): The number of seconds to wait.
- timeout_message (str, default ''): The message to print after the window has elapsed.
- prompt (str, default ''): The prompt to print during the window.
- icon (bool, default False):
If
True
, print the configured input icon.
Returns
- The input string entered by the user.
520def replace_pipes_in_dict( 521 pipes : Optional[PipesDict] = None, 522 func: 'function' = str, 523 debug: bool = False, 524 **kw 525 ) -> PipesDict: 526 """ 527 Replace the Pipes in a Pipes dict with the result of another function. 528 529 Parameters 530 ---------- 531 pipes: Optional[PipesDict], default None 532 The pipes dict to be processed. 533 534 func: Callable[[Any], Any], default str 535 The function to be applied to every pipe. 536 Defaults to the string constructor. 537 538 debug: bool, default False 539 Verbosity toggle. 540 541 542 Returns 543 ------- 544 A dictionary where every pipe is replaced with the output of a function. 545 546 """ 547 import copy 548 def change_dict(d : Dict[Any, Any], func : 'function') -> None: 549 for k, v in d.items(): 550 if isinstance(v, dict): 551 change_dict(v, func) 552 else: 553 d[k] = func(v) 554 555 if pipes is None: 556 from meerschaum import get_pipes 557 pipes = get_pipes(debug=debug, **kw) 558 559 result = copy.deepcopy(pipes) 560 change_dict(result, func) 561 return result
Replace the Pipes in a Pipes dict with the result of another function.
Parameters
- pipes (Optional[PipesDict], default None): The pipes dict to be processed.
- func (Callable[[Any], Any], default str): The function to be applied to every pipe. Defaults to the string constructor.
- debug (bool, default False): Verbosity toggle.
Returns
- A dictionary where every pipe is replaced with the output of a function.
563def enforce_gevent_monkey_patch(): 564 """ 565 Check if gevent monkey patching is enabled, and if not, then apply patching. 566 """ 567 from meerschaum.utils.packages import attempt_import 568 import socket 569 gevent, gevent_socket, gevent_monkey = attempt_import( 570 'gevent', 'gevent.socket', 'gevent.monkey' 571 ) 572 if not socket.socket is gevent_socket.socket: 573 gevent_monkey.patch_all()
Check if gevent monkey patching is enabled, and if not, then apply patching.
575def is_valid_email(email: str) -> Union['re.Match', None]: 576 """ 577 Check whether a string is a valid email. 578 579 Parameters 580 ---------- 581 email: str 582 The string to be examined. 583 584 Returns 585 ------- 586 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 587 588 Examples 589 -------- 590 >>> is_valid_email('foo') 591 >>> is_valid_email('foo@foo.com') 592 <re.Match object; span=(0, 11), match='foo@foo.com'> 593 594 """ 595 import re 596 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 597 return re.search(regex, email)
Check whether a string is a valid email.
Parameters
- email (str): The string to be examined.
Returns
- None if a string is not in email format, otherwise a
re.Match
object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
600def string_width(string: str, widest: bool = True) -> int: 601 """ 602 Calculate the width of a string, either by its widest or last line. 603 604 Parameters 605 ---------- 606 string: str: 607 The string to be examined. 608 609 widest: bool, default True 610 No longer used because `widest` is always assumed to be true. 611 612 Returns 613 ------- 614 An integer for the text's visual width. 615 616 Examples 617 -------- 618 >>> string_width('a') 619 1 620 >>> string_width('a\\nbc\\nd') 621 2 622 623 """ 624 def _widest(): 625 words = string.split('\n') 626 max_length = 0 627 for w in words: 628 length = len(w) 629 if length > max_length: 630 max_length = length 631 return max_length 632 633 return _widest()
Calculate the width of a string, either by its widest or last line.
Parameters
- string (str:): The string to be examined.
- widest (bool, default True):
No longer used because
widest
is always assumed to be true.
Returns
- An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
635def _pyinstaller_traverse_dir( 636 directory: str, 637 ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), 638 include_dotfiles: bool = False 639 ) -> list: 640 """ 641 Recursively traverse a directory and return a list of its contents. 642 """ 643 import os, pathlib 644 paths = [] 645 _directory = pathlib.Path(directory) 646 647 def _found_pattern(name: str): 648 for pattern in ignore_patterns: 649 if pattern.replace('/', os.path.sep) in str(name): 650 return True 651 return False 652 653 for root, dirs, files in os.walk(_directory): 654 _root = str(root)[len(str(_directory.parent)):] 655 if _root.startswith(os.path.sep): 656 _root = _root[len(os.path.sep):] 657 if _root.startswith('.') and not include_dotfiles: 658 continue 659 ### ignore certain patterns 660 if _found_pattern(_root): 661 continue 662 663 for filename in files: 664 if filename.startswith('.') and not include_dotfiles: 665 continue 666 path = os.path.join(root, filename) 667 if _found_pattern(path): 668 continue 669 670 _path = str(path)[len(str(_directory.parent)):] 671 if _path.startswith(os.path.sep): 672 _path = _path[len(os.path.sep):] 673 _path = os.path.sep.join(_path.split(os.path.sep)[:-1]) 674 675 paths.append((path, _path)) 676 return paths
Recursively traverse a directory and return a list of its contents.
679def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 680 """ 681 Recursively replace passwords in a dictionary. 682 683 Parameters 684 ---------- 685 d: Dict[str, Any] 686 The dictionary to search through. 687 688 replace_with: str, default '*' 689 The string to replace each character of the password with. 690 691 Returns 692 ------- 693 Another dictionary where values to the keys `'password'` 694 are replaced with `replace_with` (`'*'`). 695 696 Examples 697 -------- 698 >>> replace_password({'a': 1}) 699 {'a': 1} 700 >>> replace_password({'password': '123'}) 701 {'password': '***'} 702 >>> replace_password({'nested': {'password': '123'}}) 703 {'nested': {'password': '***'}} 704 >>> replace_password({'password': '123'}, replace_with='!') 705 {'password': '!!!'} 706 707 """ 708 import copy 709 _d = copy.deepcopy(d) 710 for k, v in d.items(): 711 if isinstance(v, dict): 712 _d[k] = replace_password(v) 713 elif 'password' in str(k).lower(): 714 _d[k] = ''.join([replace_with for char in str(v)]) 715 elif str(k).lower() == 'uri': 716 from meerschaum.connectors.sql import SQLConnector 717 try: 718 uri_params = SQLConnector.parse_uri(v) 719 except Exception as e: 720 uri_params = None 721 if not uri_params: 722 continue 723 if not 'username' in uri_params or not 'password' in uri_params: 724 continue 725 _d[k] = v.replace( 726 uri_params['username'] + ':' + uri_params['password'], 727 uri_params['username'] + ':' + ''.join( 728 [replace_with for char in str(uri_params['password'])] 729 ) 730 ) 731 return _d
Recursively replace passwords in a dictionary.
Parameters
- d (Dict[str, Any]): The dictionary to search through.
- replace_with (str, default '*'): The string to replace each character of the password with.
Returns
- Another dictionary where values to the keys
'password'
- are replaced with
replace_with
('*'
).
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
734def filter_arguments( 735 func: Callable[[Any], Any], 736 *args: Any, 737 **kwargs: Any 738) -> Tuple[Tuple[Any], Dict[str, Any]]: 739 """ 740 Filter out unsupported positional and keyword arguments. 741 742 Parameters 743 ---------- 744 func: Callable[[Any], Any] 745 The function to inspect. 746 747 *args: Any 748 Positional arguments to filter and pass to `func`. 749 750 **kwargs 751 Keyword arguments to filter and pass to `func`. 752 753 Returns 754 ------- 755 The `args` and `kwargs` accepted by `func`. 756 """ 757 args = filter_positionals(func, *args) 758 kwargs = filter_keywords(func, **kwargs) 759 return args, kwargs
Filter out unsupported positional and keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
Positional arguments to filter and pass to
func
. - **kwargs: Keyword arguments to filter and pass to
func
.
Returns
- The
args
andkwargs
accepted byfunc
.
762def filter_keywords( 763 func: Callable[[Any], Any], 764 **kw: Any 765) -> Dict[str, Any]: 766 """ 767 Filter out unsupported keyword arguments. 768 769 Parameters 770 ---------- 771 func: Callable[[Any], Any] 772 The function to inspect. 773 774 **kw: Any 775 The arguments to be filtered and passed into `func`. 776 777 Returns 778 ------- 779 A dictionary of keyword arguments accepted by `func`. 780 781 Examples 782 -------- 783 ```python 784 >>> def foo(a=1, b=2): 785 ... return a * b 786 >>> filter_keywords(foo, a=2, b=4, c=6) 787 {'a': 2, 'b': 4} 788 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 789 8 790 ``` 791 792 """ 793 import inspect 794 func_params = inspect.signature(func).parameters 795 ### If the function has a **kw method, skip filtering. 796 for param, _type in func_params.items(): 797 if '**' in str(_type): 798 return kw 799 return {k: v for k, v in kw.items() if k in func_params}
Filter out unsupported keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- **kw (Any):
The arguments to be filtered and passed into
func
.
Returns
- A dictionary of keyword arguments accepted by
func
.
Examples
>>> def foo(a=1, b=2):
... return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
802def filter_positionals( 803 func: Callable[[Any], Any], 804 *args: Any 805) -> Tuple[Any]: 806 """ 807 Filter out unsupported positional arguments. 808 809 Parameters 810 ---------- 811 func: Callable[[Any], Any] 812 The function to inspect. 813 814 *args: Any 815 The arguments to be filtered and passed into `func`. 816 NOTE: If the function signature expects more arguments than provided, 817 the missing slots will be filled with `None`. 818 819 Returns 820 ------- 821 A tuple of positional arguments accepted by `func`. 822 823 Examples 824 -------- 825 ```python 826 >>> def foo(a, b): 827 ... return a * b 828 >>> filter_positionals(foo, 2, 4, 6) 829 (2, 4) 830 >>> foo(*filter_positionals(foo, 2, 4, 6)) 831 8 832 ``` 833 834 """ 835 import inspect 836 from meerschaum.utils.warnings import warn 837 func_params = inspect.signature(func).parameters 838 acceptable_args: List[Any] = [] 839 840 def _warn_invalids(_num_invalid): 841 if _num_invalid > 0: 842 warn( 843 "Too few arguments were provided. " 844 + f"{_num_invalid} argument" 845 + ('s have ' if _num_invalid != 1 else " has ") 846 + " been filled with `None`.", 847 ) 848 849 num_invalid: int = 0 850 for i, (param, val) in enumerate(func_params.items()): 851 if '=' in str(val) or '*' in str(val): 852 _warn_invalids(num_invalid) 853 return tuple(acceptable_args) 854 855 try: 856 acceptable_args.append(args[i]) 857 except IndexError: 858 acceptable_args.append(None) 859 num_invalid += 1 860 861 _warn_invalids(num_invalid) 862 return tuple(acceptable_args)
Filter out unsupported positional arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
The arguments to be filtered and passed into
func
. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled withNone
.
Returns
- A tuple of positional arguments accepted by
func
.
Examples
>>> def foo(a, b):
... return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
865def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 866 """ 867 Convert an ordered dict to a dict. 868 Does not mutate the original OrderedDict. 869 """ 870 from collections import OrderedDict 871 _d = dict(od) 872 for k, v in od.items(): 873 if isinstance(v, OrderedDict) or ( 874 issubclass(type(v), OrderedDict) 875 ): 876 _d[k] = dict_from_od(v) 877 return _d
Convert an ordered dict to a dict. Does not mutate the original OrderedDict.
879def remove_ansi(s : str) -> str: 880 """ 881 Remove ANSI escape characters from a string. 882 883 Parameters 884 ---------- 885 s: str: 886 The string to be cleaned. 887 888 Returns 889 ------- 890 A string with the ANSI characters removed. 891 892 Examples 893 -------- 894 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 895 'Hello, World!' 896 897 """ 898 import re 899 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
Remove ANSI escape characters from a string.
Parameters
- s (str:): The string to be cleaned.
Returns
- A string with the ANSI characters removed.
Examples
>>> remove_ansi("[1;31mHello, World![0m")
'Hello, World!'
902def get_connector_labels( 903 *types: str, 904 search_term: str = '', 905 ignore_exact_match = True, 906 ) -> List[str]: 907 """ 908 Read connector labels from the configuration dictionary. 909 910 Parameters 911 ---------- 912 *types: str 913 The connector types. 914 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 915 916 search_term: str, default '' 917 A filter on the connectors' labels. 918 919 ignore_exact_match: bool, default True 920 If `True`, skip a connector if the search_term is an exact match. 921 922 Returns 923 ------- 924 A list of the keys of defined connectors. 925 926 """ 927 from meerschaum.config import get_config 928 connectors = get_config('meerschaum', 'connectors') 929 930 _types = list(types) 931 if len(_types) == 0: 932 _types = list(connectors.keys()) + ['plugin'] 933 934 conns = [] 935 for t in _types: 936 if t == 'plugin': 937 from meerschaum.plugins import get_data_plugins 938 conns += [ 939 f'{t}:' + plugin.module.__name__.split('.')[-1] 940 for plugin in get_data_plugins() 941 ] 942 continue 943 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 944 945 possibilities = [ 946 c for c in conns 947 if c.startswith(search_term) 948 and c != ( 949 search_term if ignore_exact_match else '' 950 ) 951 ] 952 return sorted(possibilities)
Read connector labels from the configuration dictionary.
Parameters
- *types (str):
The connector types.
If none are provided, use the defined types (
'sql'
and'api'
) and'plugin'
. - search_term (str, default ''): A filter on the connectors' labels.
- ignore_exact_match (bool, default True):
If
True
, skip a connector if the search_term is an exact match.
Returns
- A list of the keys of defined connectors.
955def json_serialize_datetime(dt: datetime) -> Union[str, None]: 956 """ 957 Serialize a datetime object into JSON (ISO format string). 958 959 Examples 960 -------- 961 >>> import json 962 >>> from datetime import datetime 963 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 964 '{"a": "2022-01-01T00:00:00Z"}' 965 966 """ 967 if not isinstance(dt, datetime): 968 return None 969 tz_suffix = 'Z' if dt.tzinfo is None else '' 970 return dt.isoformat() + tz_suffix
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
973def wget( 974 url: str, 975 dest: Optional[Union[str, 'pathlib.Path']] = None, 976 headers: Optional[Dict[str, Any]] = None, 977 color: bool = True, 978 debug: bool = False, 979 **kw: Any 980 ) -> 'pathlib.Path': 981 """ 982 Mimic `wget` with `requests`. 983 984 Parameters 985 ---------- 986 url: str 987 The URL to the resource to be downloaded. 988 989 dest: Optional[Union[str, pathlib.Path]], default None 990 The destination path of the downloaded file. 991 If `None`, save to the current directory. 992 993 color: bool, default True 994 If `debug` is `True`, print color output. 995 996 debug: bool, default False 997 Verbosity toggle. 998 999 Returns 1000 ------- 1001 The path to the downloaded file. 1002 1003 """ 1004 from meerschaum.utils.warnings import warn, error 1005 from meerschaum.utils.debug import dprint 1006 import os, pathlib, re, urllib.request 1007 if headers is None: 1008 headers = {} 1009 request = urllib.request.Request(url, headers=headers) 1010 if not color: 1011 dprint = print 1012 if debug: 1013 dprint(f"Downloading from '{url}'...") 1014 try: 1015 response = urllib.request.urlopen(request) 1016 except Exception as e: 1017 import ssl 1018 ssl._create_default_https_context = ssl._create_unverified_context 1019 try: 1020 response = urllib.request.urlopen(request) 1021 except Exception as _e: 1022 print(_e) 1023 response = None 1024 if response is None or response.code != 200: 1025 error_msg = f"Failed to download from '{url}'." 1026 if color: 1027 error(error_msg) 1028 else: 1029 print(error_msg) 1030 import sys 1031 sys.exit(1) 1032 1033 d = response.headers.get('content-disposition', None) 1034 fname = ( 1035 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1036 else url.split('/')[-1] 1037 ) 1038 1039 if dest is None: 1040 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1041 elif isinstance(dest, str): 1042 dest = pathlib.Path(dest) 1043 1044 with open(dest, 'wb') as f: 1045 f.write(response.fp.read()) 1046 1047 if debug: 1048 dprint(f"Downloaded file '{dest}'.") 1049 1050 return dest
Mimic wget
with requests
.
Parameters
- url (str): The URL to the resource to be downloaded.
- dest (Optional[Union[str, pathlib.Path]], default None):
The destination path of the downloaded file.
If
None
, save to the current directory. - color (bool, default True):
If
debug
isTrue
, print color output. - debug (bool, default False): Verbosity toggle.
Returns
- The path to the downloaded file.
1053def async_wrap(func): 1054 """ 1055 Run a synchronous function as async. 1056 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1057 """ 1058 import asyncio 1059 from functools import wraps, partial 1060 1061 @wraps(func) 1062 async def run(*args, loop=None, executor=None, **kwargs): 1063 if loop is None: 1064 loop = asyncio.get_event_loop() 1065 pfunc = partial(func, *args, **kwargs) 1066 return await loop.run_in_executor(executor, pfunc) 1067 return run
Run a synchronous function as async. https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1070def debug_trace(browser: bool = True): 1071 """ 1072 Open a web-based debugger to trace the execution of the program. 1073 1074 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1075 """ 1076 from meerschaum.utils.debug import trace 1077 trace(browser=browser)
Open a web-based debugger to trace the execution of the program.
This is an alias import for debug_trace
.
1080def items_str( 1081 items: List[Any], 1082 quotes: bool = True, 1083 quote_str: str = "'", 1084 commas: bool = True, 1085 comma_str: str = ',', 1086 and_: bool = True, 1087 and_str: str = 'and', 1088 oxford_comma: bool = True, 1089 spaces: bool = True, 1090 space_str = ' ', 1091 ) -> str: 1092 """ 1093 Return a formatted string if list items separated by commas. 1094 1095 Parameters 1096 ---------- 1097 items: [List[Any]] 1098 The items to be printed as an English list. 1099 1100 quotes: bool, default True 1101 If `True`, wrap items in quotes. 1102 1103 quote_str: str, default "'" 1104 If `quotes` is `True`, prepend and append each item with this string. 1105 1106 and_: bool, default True 1107 If `True`, include the word 'and' before the final item in the list. 1108 1109 and_str: str, default 'and' 1110 If `and_` is True, insert this string where 'and' normally would in and English list. 1111 1112 oxford_comma: bool, default True 1113 If `True`, include the Oxford Comma (comma before the final 'and'). 1114 Only applies when `and_` is `True`. 1115 1116 spaces: bool, default True 1117 If `True`, separate items with `space_str` 1118 1119 space_str: str, default ' ' 1120 If `spaces` is `True`, separate items with this string. 1121 1122 Returns 1123 ------- 1124 A string of the items as an English list. 1125 1126 Examples 1127 -------- 1128 >>> items_str([1,2,3]) 1129 "'1', '2', and '3'" 1130 >>> items_str([1,2,3], quotes=False) 1131 '1, 2, and 3' 1132 >>> items_str([1,2,3], and_=False) 1133 "'1', '2', '3'" 1134 >>> items_str([1,2,3], spaces=False, and_=False) 1135 "'1','2','3'" 1136 >>> items_str([1,2,3], oxford_comma=False) 1137 "'1', '2' and '3'" 1138 >>> items_str([1,2,3], quote_str=":") 1139 ':1:, :2:, and :3:' 1140 >>> items_str([1,2,3], and_str="or") 1141 "'1', '2', or '3'" 1142 >>> items_str([1,2,3], space_str="_") 1143 "'1',_'2',_and_'3'" 1144 1145 """ 1146 if not items: 1147 return '' 1148 1149 q = quote_str if quotes else '' 1150 s = space_str if spaces else '' 1151 a = and_str if and_ else '' 1152 c = comma_str if commas else '' 1153 1154 if len(items) == 1: 1155 return q + str(list(items)[0]) + q 1156 1157 if len(items) == 2: 1158 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1159 1160 sep = q + c + s + q 1161 output = q + sep.join(str(i) for i in items[:-1]) + q 1162 if oxford_comma: 1163 output += c 1164 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1165 return output
Return a formatted string if list items separated by commas.
Parameters
- items ([List[Any]]): The items to be printed as an English list.
- quotes (bool, default True):
If
True
, wrap items in quotes. - quote_str (str, default "'"):
If
quotes
isTrue
, prepend and append each item with this string. - and_ (bool, default True):
If
True
, include the word 'and' before the final item in the list. - and_str (str, default 'and'):
If
and_
is True, insert this string where 'and' normally would in and English list. - oxford_comma (bool, default True):
If
True
, include the Oxford Comma (comma before the final 'and'). Only applies whenand_
isTrue
. - spaces (bool, default True):
If
True
, separate items withspace_str
- space_str (str, default ' '):
If
spaces
isTrue
, separate items with this string.
Returns
- A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
1168def interval_str(delta: Union[timedelta, int]) -> str: 1169 """ 1170 Return a human-readable string for a `timedelta` (or `int` minutes). 1171 1172 Parameters 1173 ---------- 1174 delta: Union[timedelta, int] 1175 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1176 1177 Returns 1178 ------- 1179 A formatted string, fit for human eyes. 1180 """ 1181 from meerschaum.utils.packages import attempt_import 1182 humanfriendly = attempt_import('humanfriendly') 1183 delta_seconds = ( 1184 delta.total_seconds() 1185 if isinstance(delta, timedelta) 1186 else (delta * 60) 1187 ) 1188 return humanfriendly.format_timespan(delta_seconds)
Return a human-readable string for a timedelta
(or int
minutes).
Parameters
- delta (Union[timedelta, int]):
The interval to print. If
delta
is an integer, assume it corresponds to minutes.
Returns
- A formatted string, fit for human eyes.
1191def is_docker_available() -> bool: 1192 """Check if we can connect to the Docker engine.""" 1193 import subprocess 1194 try: 1195 has_docker = subprocess.call( 1196 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1197 ) == 0 1198 except Exception as e: 1199 has_docker = False 1200 return has_docker
Check if we can connect to the Docker engine.
1203def is_android() -> bool: 1204 """Return `True` if the current platform is Android.""" 1205 import sys 1206 return hasattr(sys, 'getandroidapilevel')
Return True
if the current platform is Android.
1209def is_bcp_available() -> bool: 1210 """Check if the MSSQL `bcp` utility is installed.""" 1211 import subprocess 1212 1213 try: 1214 has_bcp = subprocess.call( 1215 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1216 ) == 0 1217 except Exception as e: 1218 has_bcp = False 1219 return has_bcp
Check if the MSSQL bcp
utility is installed.
1222def get_last_n_lines(file_name: str, N: int): 1223 """ 1224 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1225 """ 1226 import os 1227 # Create an empty list to keep the track of last N lines 1228 list_of_lines = [] 1229 # Open file for reading in binary mode 1230 with open(file_name, 'rb') as read_obj: 1231 # Move the cursor to the end of the file 1232 read_obj.seek(0, os.SEEK_END) 1233 # Create a buffer to keep the last read line 1234 buffer = bytearray() 1235 # Get the current position of pointer i.e eof 1236 pointer_location = read_obj.tell() 1237 # Loop till pointer reaches the top of the file 1238 while pointer_location >= 0: 1239 # Move the file pointer to the location pointed by pointer_location 1240 read_obj.seek(pointer_location) 1241 # Shift pointer location by -1 1242 pointer_location = pointer_location -1 1243 # read that byte / character 1244 new_byte = read_obj.read(1) 1245 # If the read byte is new line character then it means one line is read 1246 if new_byte == b'\n': 1247 # Save the line in list of lines 1248 list_of_lines.append(buffer.decode()[::-1]) 1249 # If the size of list reaches N, then return the reversed list 1250 if len(list_of_lines) == N: 1251 return list(reversed(list_of_lines)) 1252 # Reinitialize the byte array to save next line 1253 buffer = bytearray() 1254 else: 1255 # If last read character is not eol then add it in buffer 1256 buffer.extend(new_byte) 1257 # As file is read completely, if there is still data in buffer, then its first line. 1258 if len(buffer) > 0: 1259 list_of_lines.append(buffer.decode()[::-1]) 1260 # return the reversed list 1261 return list(reversed(list_of_lines))
1264def tail(f, n, offset=None): 1265 """ 1266 https://stackoverflow.com/a/692616/9699829 1267 1268 Reads n lines from f with an offset of offset lines. The return 1269 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1270 an indicator that is `True` if there are more lines in the file. 1271 """ 1272 avg_line_length = 74 1273 to_read = n + (offset or 0) 1274 1275 while True: 1276 try: 1277 f.seek(-(avg_line_length * to_read), 2) 1278 except IOError: 1279 # woops. apparently file is smaller than what we want 1280 # to step back, go to the beginning instead 1281 f.seek(0) 1282 pos = f.tell() 1283 lines = f.read().splitlines() 1284 if len(lines) >= to_read or pos == 0: 1285 return lines[-to_read:offset and -offset or None], \ 1286 len(lines) > to_read or pos > 0 1287 avg_line_length *= 1.3
https://stackoverflow.com/a/692616/9699829
Reads n lines from f with an offset of offset lines. The return
value is a tuple in the form (lines, has_more)
where has_more
is
an indicator that is True
if there are more lines in the file.
1290def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1291 """ 1292 Remove characters from each section of a string until the length is within the limit. 1293 1294 Parameters 1295 ---------- 1296 item: str 1297 The item name to be truncated. 1298 1299 delimeter: str, default '_' 1300 Split `item` by this string into several sections. 1301 1302 max_len: int, default 128 1303 The max acceptable length of the truncated version of `item`. 1304 1305 Returns 1306 ------- 1307 The truncated string. 1308 1309 Examples 1310 -------- 1311 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1312 'ab_de_gh' 1313 1314 """ 1315 if len(item) < max_len: 1316 return item 1317 1318 def _shorten(s: str) -> str: 1319 return s[:-1] if len(s) > 1 else s 1320 1321 sections = list(enumerate(item.split('_'))) 1322 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1323 available_chars = max_len - len(sections) 1324 1325 _sections = [(i, s) for i, s in sorted_sections] 1326 _sections_len = sum([len(s) for i, s in _sections]) 1327 _old_sections_len = _sections_len 1328 while _sections_len > available_chars: 1329 _sections = [(i, _shorten(s)) for i, s in _sections] 1330 _old_sections_len = _sections_len 1331 _sections_len = sum([len(s) for i, s in _sections]) 1332 if _old_sections_len == _sections_len: 1333 raise Exception(f"String could not be truncated: '{item}'") 1334 1335 new_sections = sorted(_sections, key=lambda x: x[0]) 1336 return delimeter.join([s for i, s in new_sections])
Remove characters from each section of a string until the length is within the limit.
Parameters
- item (str): The item name to be truncated.
- delimeter (str, default '_'):
Split
item
by this string into several sections. - max_len (int, default 128):
The max acceptable length of the truncated version of
item
.
Returns
- The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
1339def separate_negation_values( 1340 vals: Union[List[str], Tuple[str]], 1341 negation_prefix: Optional[str] = None, 1342 ) -> Tuple[List[str], List[str]]: 1343 """ 1344 Separate the negated values from the positive ones. 1345 Return two lists: positive and negative values. 1346 1347 Parameters 1348 ---------- 1349 vals: Union[List[str], Tuple[str]] 1350 A list of strings to parse. 1351 1352 negation_prefix: Optional[str], default None 1353 Include values that start with this string in the second list. 1354 If `None`, use the system default (`_`). 1355 """ 1356 if negation_prefix is None: 1357 from meerschaum.config.static import STATIC_CONFIG 1358 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1359 _in_vals, _ex_vals = [], [] 1360 for v in vals: 1361 if str(v).startswith(negation_prefix): 1362 _ex_vals.append(str(v)[len(negation_prefix):]) 1363 else: 1364 _in_vals.append(v) 1365 1366 return _in_vals, _ex_vals
Separate the negated values from the positive ones. Return two lists: positive and negative values.
Parameters
- vals (Union[List[str], Tuple[str]]): A list of strings to parse.
- negation_prefix (Optional[str], default None):
Include values that start with this string in the second list.
If
None
, use the system default (_
).
1369def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1370 """ 1371 Translate a params dictionary into lists of include- and exclude-values. 1372 1373 Parameters 1374 ---------- 1375 params: Optional[Dict[str, Any]] 1376 A params query dictionary. 1377 1378 Returns 1379 ------- 1380 A dictionary mapping keys to a tuple of lists for include and exclude values. 1381 1382 Examples 1383 -------- 1384 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1385 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1386 """ 1387 if not params: 1388 return {} 1389 return { 1390 col: separate_negation_values( 1391 ( 1392 val 1393 if isinstance(val, (list, tuple)) 1394 else [val] 1395 ) 1396 ) 1397 for col, val in params.items() 1398 }
Translate a params dictionary into lists of include- and exclude-values.
Parameters
- params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
- A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
1401def flatten_list(list_: List[Any]) -> List[Any]: 1402 """ 1403 Recursively flatten a list. 1404 """ 1405 for item in list_: 1406 if isinstance(item, list): 1407 yield from flatten_list(item) 1408 else: 1409 yield item
Recursively flatten a list.
1412def make_symlink(src_path: pathlib.Path, dest_path: pathlib.Path) -> SuccessTuple: 1413 """ 1414 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1415 1416 Parameters 1417 ---------- 1418 src_path: pathlib.Path 1419 The source path. 1420 1421 dest_path: pathlib.Path 1422 The destination path. 1423 1424 Returns 1425 ------- 1426 A SuccessTuple indicating success. 1427 """ 1428 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1429 return True, "Symlink already exists." 1430 try: 1431 dest_path.symlink_to(src_path) 1432 success = True 1433 except Exception as e: 1434 success = False 1435 msg = str(e) 1436 if success: 1437 return success, "Success" 1438 1439 ### Failed to create a symlink. 1440 ### If we're not on Windows, return an error. 1441 import platform 1442 if platform.system() != 'Windows': 1443 return success, msg 1444 1445 try: 1446 import _winapi 1447 except ImportError: 1448 return False, "Unable to import _winapi." 1449 1450 if src_path.is_dir(): 1451 try: 1452 _winapi.CreateJunction(str(src_path), str(dest_path)) 1453 except Exception as e: 1454 return False, str(e) 1455 return True, "Success" 1456 1457 ### Last resort: copy the file on Windows. 1458 import shutil 1459 try: 1460 shutil.copy(src_path, dest_path) 1461 except Exception as e: 1462 return False, str(e) 1463 1464 return True, "Success"
Wrap around pathlib.Path.symlink_to
, but add support for Windows.
Parameters
- src_path (pathlib.Path): The source path.
- dest_path (pathlib.Path): The destination path.
Returns
- A SuccessTuple indicating success.
1467def is_symlink(path: pathlib.Path) -> bool: 1468 """ 1469 Wrap `path.is_symlink()` but add support for Windows junctions. 1470 """ 1471 if path.is_symlink(): 1472 return True 1473 import platform, os 1474 if platform.system() != 'Windows': 1475 return False 1476 try: 1477 return bool(os.readlink(path)) 1478 except OSError: 1479 return False
Wrap path.is_symlink()
but add support for Windows junctions.
1482def parametrized(dec): 1483 """ 1484 A meta-decorator for allowing other decorator functions to have parameters. 1485 1486 https://stackoverflow.com/a/26151604/9699829 1487 """ 1488 def layer(*args, **kwargs): 1489 def repl(f): 1490 return dec(f, *args, **kwargs) 1491 return repl 1492 return layer
A meta-decorator for allowing other decorator functions to have parameters.
1495def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1496 """ 1497 Safely extract a TAR file to a give directory. 1498 This defends against CVE-2007-4559. 1499 1500 Parameters 1501 ---------- 1502 tarf: file 1503 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1504 1505 output_dir: Union[str, pathlib.Path] 1506 The output directory. 1507 """ 1508 import os 1509 1510 def is_within_directory(directory, target): 1511 abs_directory = os.path.abspath(directory) 1512 abs_target = os.path.abspath(target) 1513 prefix = os.path.commonprefix([abs_directory, abs_target]) 1514 return prefix == abs_directory 1515 1516 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1517 for member in tar.getmembers(): 1518 member_path = os.path.join(path, member.name) 1519 if not is_within_directory(path, member_path): 1520 raise Exception("Attempted Path Traversal in Tar File") 1521 1522 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1523 1524 return safe_extract(tarf, output_dir)
Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.
Parameters
- tarf (file):
The TAR file opened with
tarfile.open(path, 'r:gz')
. - output_dir (Union[str, pathlib.Path]): The output directory.
1530def choose_subaction(*args, **kwargs) -> Any: 1531 """ 1532 Placeholder function to prevent breaking legacy behavior. 1533 See `meerschaum.actions.choose_subaction`. 1534 """ 1535 from meerschaum.actions import choose_subaction as _choose_subactions 1536 return _choose_subactions(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See choose_subaction
.
1539def print_options(*args, **kwargs) -> None: 1540 """ 1541 Placeholder function to prevent breaking legacy behavior. 1542 See `meerschaum.utils.formatting.print_options`. 1543 """ 1544 from meerschaum.utils.formatting import print_options as _print_options 1545 return _print_options(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.utils.formatting.print_options
.