meerschaum.utils.misc
Miscellaneous functions go here
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4""" 5Miscellaneous functions go here 6""" 7 8from __future__ import annotations 9import sys 10from datetime import timedelta, datetime, timezone 11from meerschaum.utils.typing import ( 12 Union, 13 Any, 14 Callable, 15 Optional, 16 List, 17 Dict, 18 SuccessTuple, 19 Iterable, 20 PipesDict, 21 Tuple, 22 InstanceConnector, 23 Hashable, 24 Generator, 25 Iterator, 26 TYPE_CHECKING, 27) 28import meerschaum as mrsm 29if TYPE_CHECKING: 30 import collections 31 32__pdoc__: Dict[str, bool] = { 33 'to_pandas_dtype': False, 34 'filter_unseen_df': False, 35 'add_missing_cols_to_df': False, 36 'parse_df_datetimes': False, 37 'df_from_literal': False, 38 'get_json_cols': False, 39 'get_unhashable_cols': False, 40 'enforce_dtypes': False, 41 'get_datetime_bound_from_df': False, 42 'df_is_chunk_generator': False, 43 'choices_docstring': False, 44 '_get_subaction_names': False, 45} 46 47 48def add_method_to_class( 49 func: Callable[[Any], Any], 50 class_def: 'Class', 51 method_name: Optional[str] = None, 52 keep_self: Optional[bool] = None, 53 ) -> Callable[[Any], Any]: 54 """ 55 Add function `func` to class `class_def`. 56 57 Parameters 58 ---------- 59 func: Callable[[Any], Any] 60 Function to be added as a method of the class 61 62 class_def: Class 63 Class to be modified. 64 65 method_name: Optional[str], default None 66 New name of the method. None will use func.__name__ (default). 67 68 Returns 69 ------- 70 The modified function object. 71 72 """ 73 from functools import wraps 74 75 is_class = isinstance(class_def, type) 76 77 @wraps(func) 78 def wrapper(self, *args, **kw): 79 return func(*args, **kw) 80 81 if method_name is None: 82 method_name = func.__name__ 83 84 setattr(class_def, method_name, ( 85 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 86 ) 87 ) 88 89 return func 90 91 92def generate_password(length: int = 12) -> str: 93 """Generate a secure password of given length. 94 95 Parameters 96 ---------- 97 length : int, default 12 98 The length of the password. 99 100 Returns 101 ------- 102 A random password string. 103 104 """ 105 import secrets, string 106 return ''.join((secrets.choice(string.ascii_letters) for i in range(length))) 107 108def is_int(s : str) -> bool: 109 """ 110 Check if string is an int. 111 112 Parameters 113 ---------- 114 s: str 115 The string to be checked. 116 117 Returns 118 ------- 119 A bool indicating whether the string was able to be cast to an integer. 120 121 """ 122 try: 123 float(s) 124 except Exception as e: 125 return False 126 127 return float(s).is_integer() 128 129 130def string_to_dict( 131 params_string: str 132 ) -> Dict[str, Any]: 133 """ 134 Parse a string into a dictionary. 135 136 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 137 138 Parameters 139 ---------- 140 params_string: str 141 The string to be parsed. 142 143 Returns 144 ------- 145 The parsed dictionary. 146 147 Examples 148 -------- 149 >>> string_to_dict("a:1,b:2") 150 {'a': 1, 'b': 2} 151 >>> string_to_dict('{"a": 1, "b": 2}') 152 {'a': 1, 'b': 2} 153 154 """ 155 if params_string == "": 156 return {} 157 158 import json 159 160 ### Kind of a weird edge case. 161 ### In the generated compose file, there is some weird escaping happening, 162 ### so the string to be parsed starts and ends with a single quote. 163 if ( 164 isinstance(params_string, str) 165 and len(params_string) > 4 166 and params_string[1] == "{" 167 and params_string[-2] == "}" 168 ): 169 return json.loads(params_string[1:-1]) 170 if str(params_string).startswith('{'): 171 return json.loads(params_string) 172 173 import ast 174 params_dict = {} 175 for param in params_string.split(","): 176 _keys = param.split(":") 177 keys = _keys[:-1] 178 try: 179 val = ast.literal_eval(_keys[-1]) 180 except Exception as e: 181 val = str(_keys[-1]) 182 183 c = params_dict 184 for _k in keys[:-1]: 185 try: 186 k = ast.literal_eval(_k) 187 except Exception as e: 188 k = str(_k) 189 if k not in c: 190 c[k] = {} 191 c = c[k] 192 193 c[keys[-1]] = val 194 195 return params_dict 196 197 198def parse_config_substitution( 199 value: str, 200 leading_key: str = 'MRSM', 201 begin_key: str = '{', 202 end_key: str = '}', 203 delimeter: str = ':' 204 ) -> List[Any]: 205 """ 206 Parse Meerschaum substitution syntax 207 E.g. MRSM{value1:value2} => ['value1', 'value2'] 208 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 209 """ 210 if not value.beginswith(leading_key): 211 return value 212 213 return leading_key[len(leading_key):][len():-1].split(delimeter) 214 215 216def edit_file( 217 path: Union['pathlib.Path', str], 218 default_editor: str = 'pyvim', 219 debug: bool = False 220) -> bool: 221 """ 222 Open a file for editing. 223 224 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 225 226 Parameters 227 ---------- 228 path: Union[pathlib.Path, str] 229 The path to the file to be edited. 230 231 default_editor: str, default 'pyvim' 232 If `$EDITOR` is not set, use this instead. 233 If `pyvim` is not installed, it will install it from PyPI. 234 235 debug: bool, default False 236 Verbosity toggle. 237 238 Returns 239 ------- 240 A bool indicating the file was successfully edited. 241 """ 242 import os 243 from subprocess import call 244 from meerschaum.utils.debug import dprint 245 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 246 try: 247 EDITOR = os.environ.get('EDITOR', default_editor) 248 if debug: 249 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 250 rc = call([EDITOR, path]) 251 except Exception as e: ### can't open with default editors 252 if debug: 253 dprint(str(e)) 254 dprint('Failed to open file with system editor. Falling back to pyvim...') 255 pyvim = attempt_import('pyvim', lazy=False) 256 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 257 return rc == 0 258 259 260def is_pipe_registered( 261 pipe: mrsm.Pipe, 262 pipes: PipesDict, 263 debug: bool = False 264) -> bool: 265 """ 266 Check if a Pipe is inside the pipes dictionary. 267 268 Parameters 269 ---------- 270 pipe: meerschaum.Pipe 271 The pipe to see if it's in the dictionary. 272 273 pipes: PipesDict 274 The dictionary to search inside. 275 276 debug: bool, default False 277 Verbosity toggle. 278 279 Returns 280 ------- 281 A bool indicating whether the pipe is inside the dictionary. 282 """ 283 from meerschaum.utils.debug import dprint 284 ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key 285 if debug: 286 dprint(f'{ck}, {mk}, {lk}') 287 dprint(f'{pipe}, {pipes}') 288 return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk] 289 290 291def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 292 """ 293 Determine the columns and lines in the terminal. 294 If they cannot be determined, return the default values (100 columns and 120 lines). 295 296 Parameters 297 ---------- 298 default_cols: int, default 100 299 If the columns cannot be determined, return this value. 300 301 default_lines: int, default 120 302 If the lines cannot be determined, return this value. 303 304 Returns 305 ------- 306 A tuple if integers for the columns and lines. 307 """ 308 import os 309 try: 310 size = os.get_terminal_size() 311 _cols, _lines = size.columns, size.lines 312 except Exception as e: 313 _cols, _lines = ( 314 int(os.environ.get('COLUMNS', str(default_cols))), 315 int(os.environ.get('LINES', str(default_lines))), 316 ) 317 return _cols, _lines 318 319 320def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 321 """ 322 Iterate over a list in chunks. 323 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 324 325 Parameters 326 ---------- 327 iterable: Iterable[Any] 328 The iterable to iterate over in chunks. 329 330 chunksize: int 331 The size of chunks to iterate with. 332 333 fillvalue: Optional[Any], default None 334 If the chunks do not evenly divide into the iterable, pad the end with this value. 335 336 Returns 337 ------- 338 A generator of tuples of size `chunksize`. 339 340 """ 341 from itertools import zip_longest 342 args = [iter(iterable)] * chunksize 343 return zip_longest(*args, fillvalue=fillvalue) 344 345def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 346 """ 347 Sort a dictionary's values and return a new dictionary. 348 349 Parameters 350 ---------- 351 d: Dict[Any, Any] 352 The dictionary to be sorted. 353 354 Returns 355 ------- 356 A sorted dictionary. 357 358 Examples 359 -------- 360 >>> sorted_dict({'b': 1, 'a': 2}) 361 {'b': 1, 'a': 2} 362 >>> sorted_dict({'b': 2, 'a': 1}) 363 {'a': 1, 'b': 2} 364 365 """ 366 try: 367 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 368 except Exception as e: 369 return d 370 371def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 372 """ 373 Convert the standard pipes dictionary into a list. 374 375 Parameters 376 ---------- 377 pipes_dict: PipesDict 378 The pipes dictionary to be flattened. 379 380 Returns 381 ------- 382 A list of `Pipe` objects. 383 384 """ 385 pipes_list = [] 386 for ck in pipes_dict.values(): 387 for mk in ck.values(): 388 pipes_list += list(mk.values()) 389 return pipes_list 390 391 392def round_time( 393 dt: Optional[datetime] = None, 394 date_delta: Optional[timedelta] = None, 395 to: 'str' = 'down' 396) -> datetime: 397 """ 398 Round a datetime object to a multiple of a timedelta. 399 http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python 400 401 NOTE: This function strips timezone information! 402 403 Parameters 404 ---------- 405 dt: Optional[datetime], default None 406 If `None`, grab the current UTC datetime. 407 408 date_delta: Optional[timedelta], default None 409 If `None`, use a delta of 1 minute. 410 411 to: 'str', default 'down' 412 Available options are `'up'`, `'down'`, and `'closest'`. 413 414 Returns 415 ------- 416 A rounded `datetime` object. 417 418 Examples 419 -------- 420 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 421 datetime.datetime(2022, 1, 1, 12, 15) 422 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 423 datetime.datetime(2022, 1, 1, 12, 16) 424 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 425 datetime.datetime(2022, 1, 1, 12, 0) 426 >>> round_time( 427 ... datetime(2022, 1, 1, 12, 15, 57, 200), 428 ... timedelta(hours=1), 429 ... to = 'closest' 430 ... ) 431 datetime.datetime(2022, 1, 1, 12, 0) 432 >>> round_time( 433 ... datetime(2022, 1, 1, 12, 45, 57, 200), 434 ... datetime.timedelta(hours=1), 435 ... to = 'closest' 436 ... ) 437 datetime.datetime(2022, 1, 1, 13, 0) 438 439 """ 440 if date_delta is None: 441 date_delta = timedelta(minutes=1) 442 round_to = date_delta.total_seconds() 443 if dt is None: 444 dt = datetime.now(timezone.utc).replace(tzinfo=None) 445 seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds 446 447 if seconds % round_to == 0 and dt.microsecond == 0: 448 rounding = (seconds + round_to / 2) // round_to * round_to 449 else: 450 if to == 'up': 451 rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to 452 elif to == 'down': 453 rounding = seconds // round_to * round_to 454 else: 455 rounding = (seconds + round_to / 2) // round_to * round_to 456 457 return dt + timedelta(0, rounding - seconds, - dt.microsecond) 458 459 460def timed_input( 461 seconds: int = 10, 462 timeout_message: str = "", 463 prompt: str = "", 464 icon: bool = False, 465 **kw 466 ) -> Union[str, None]: 467 """ 468 Accept user input only for a brief period of time. 469 470 Parameters 471 ---------- 472 seconds: int, default 10 473 The number of seconds to wait. 474 475 timeout_message: str, default '' 476 The message to print after the window has elapsed. 477 478 prompt: str, default '' 479 The prompt to print during the window. 480 481 icon: bool, default False 482 If `True`, print the configured input icon. 483 484 485 Returns 486 ------- 487 The input string entered by the user. 488 489 """ 490 import signal, time 491 492 class TimeoutExpired(Exception): 493 """Raise this exception when the timeout is reached.""" 494 495 def alarm_handler(signum, frame): 496 raise TimeoutExpired 497 498 # set signal handler 499 signal.signal(signal.SIGALRM, alarm_handler) 500 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 501 502 try: 503 return input(prompt) 504 except TimeoutExpired: 505 return None 506 except (EOFError, RuntimeError): 507 try: 508 print(prompt) 509 time.sleep(seconds) 510 except TimeoutExpired: 511 return None 512 finally: 513 signal.alarm(0) # cancel alarm 514 515 516 517 518 519def replace_pipes_in_dict( 520 pipes : Optional[PipesDict] = None, 521 func: 'function' = str, 522 debug: bool = False, 523 **kw 524 ) -> PipesDict: 525 """ 526 Replace the Pipes in a Pipes dict with the result of another function. 527 528 Parameters 529 ---------- 530 pipes: Optional[PipesDict], default None 531 The pipes dict to be processed. 532 533 func: Callable[[Any], Any], default str 534 The function to be applied to every pipe. 535 Defaults to the string constructor. 536 537 debug: bool, default False 538 Verbosity toggle. 539 540 541 Returns 542 ------- 543 A dictionary where every pipe is replaced with the output of a function. 544 545 """ 546 import copy 547 def change_dict(d : Dict[Any, Any], func : 'function') -> None: 548 for k, v in d.items(): 549 if isinstance(v, dict): 550 change_dict(v, func) 551 else: 552 d[k] = func(v) 553 554 if pipes is None: 555 from meerschaum import get_pipes 556 pipes = get_pipes(debug=debug, **kw) 557 558 result = copy.deepcopy(pipes) 559 change_dict(result, func) 560 return result 561 562def enforce_gevent_monkey_patch(): 563 """ 564 Check if gevent monkey patching is enabled, and if not, then apply patching. 565 """ 566 from meerschaum.utils.packages import attempt_import 567 import socket 568 gevent, gevent_socket, gevent_monkey = attempt_import( 569 'gevent', 'gevent.socket', 'gevent.monkey' 570 ) 571 if not socket.socket is gevent_socket.socket: 572 gevent_monkey.patch_all() 573 574def is_valid_email(email: str) -> Union['re.Match', None]: 575 """ 576 Check whether a string is a valid email. 577 578 Parameters 579 ---------- 580 email: str 581 The string to be examined. 582 583 Returns 584 ------- 585 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 586 587 Examples 588 -------- 589 >>> is_valid_email('foo') 590 >>> is_valid_email('foo@foo.com') 591 <re.Match object; span=(0, 11), match='foo@foo.com'> 592 593 """ 594 import re 595 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 596 return re.search(regex, email) 597 598 599def string_width(string: str, widest: bool = True) -> int: 600 """ 601 Calculate the width of a string, either by its widest or last line. 602 603 Parameters 604 ---------- 605 string: str: 606 The string to be examined. 607 608 widest: bool, default True 609 No longer used because `widest` is always assumed to be true. 610 611 Returns 612 ------- 613 An integer for the text's visual width. 614 615 Examples 616 -------- 617 >>> string_width('a') 618 1 619 >>> string_width('a\\nbc\\nd') 620 2 621 622 """ 623 def _widest(): 624 words = string.split('\n') 625 max_length = 0 626 for w in words: 627 length = len(w) 628 if length > max_length: 629 max_length = length 630 return max_length 631 632 return _widest() 633 634def _pyinstaller_traverse_dir( 635 directory: str, 636 ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), 637 include_dotfiles: bool = False 638 ) -> list: 639 """ 640 Recursively traverse a directory and return a list of its contents. 641 """ 642 import os, pathlib 643 paths = [] 644 _directory = pathlib.Path(directory) 645 646 def _found_pattern(name: str): 647 for pattern in ignore_patterns: 648 if pattern.replace('/', os.path.sep) in str(name): 649 return True 650 return False 651 652 for root, dirs, files in os.walk(_directory): 653 _root = str(root)[len(str(_directory.parent)):] 654 if _root.startswith(os.path.sep): 655 _root = _root[len(os.path.sep):] 656 if _root.startswith('.') and not include_dotfiles: 657 continue 658 ### ignore certain patterns 659 if _found_pattern(_root): 660 continue 661 662 for filename in files: 663 if filename.startswith('.') and not include_dotfiles: 664 continue 665 path = os.path.join(root, filename) 666 if _found_pattern(path): 667 continue 668 669 _path = str(path)[len(str(_directory.parent)):] 670 if _path.startswith(os.path.sep): 671 _path = _path[len(os.path.sep):] 672 _path = os.path.sep.join(_path.split(os.path.sep)[:-1]) 673 674 paths.append((path, _path)) 675 return paths 676 677 678def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 679 """ 680 Recursively replace passwords in a dictionary. 681 682 Parameters 683 ---------- 684 d: Dict[str, Any] 685 The dictionary to search through. 686 687 replace_with: str, default '*' 688 The string to replace each character of the password with. 689 690 Returns 691 ------- 692 Another dictionary where values to the keys `'password'` 693 are replaced with `replace_with` (`'*'`). 694 695 Examples 696 -------- 697 >>> replace_password({'a': 1}) 698 {'a': 1} 699 >>> replace_password({'password': '123'}) 700 {'password': '***'} 701 >>> replace_password({'nested': {'password': '123'}}) 702 {'nested': {'password': '***'}} 703 >>> replace_password({'password': '123'}, replace_with='!') 704 {'password': '!!!'} 705 706 """ 707 import copy 708 _d = copy.deepcopy(d) 709 for k, v in d.items(): 710 if isinstance(v, dict): 711 _d[k] = replace_password(v) 712 elif 'password' in str(k).lower(): 713 _d[k] = ''.join([replace_with for char in str(v)]) 714 elif str(k).lower() == 'uri': 715 from meerschaum.connectors.sql import SQLConnector 716 try: 717 uri_params = SQLConnector.parse_uri(v) 718 except Exception as e: 719 uri_params = None 720 if not uri_params: 721 continue 722 if not 'username' in uri_params or not 'password' in uri_params: 723 continue 724 _d[k] = v.replace( 725 uri_params['username'] + ':' + uri_params['password'], 726 uri_params['username'] + ':' + ''.join( 727 [replace_with for char in str(uri_params['password'])] 728 ) 729 ) 730 return _d 731 732 733def filter_arguments( 734 func: Callable[[Any], Any], 735 *args: Any, 736 **kwargs: Any 737) -> Tuple[Tuple[Any], Dict[str, Any]]: 738 """ 739 Filter out unsupported positional and keyword arguments. 740 741 Parameters 742 ---------- 743 func: Callable[[Any], Any] 744 The function to inspect. 745 746 *args: Any 747 Positional arguments to filter and pass to `func`. 748 749 **kwargs 750 Keyword arguments to filter and pass to `func`. 751 752 Returns 753 ------- 754 The `args` and `kwargs` accepted by `func`. 755 """ 756 args = filter_positionals(func, *args) 757 kwargs = filter_keywords(func, **kwargs) 758 return args, kwargs 759 760 761def filter_keywords( 762 func: Callable[[Any], Any], 763 **kw: Any 764) -> Dict[str, Any]: 765 """ 766 Filter out unsupported keyword arguments. 767 768 Parameters 769 ---------- 770 func: Callable[[Any], Any] 771 The function to inspect. 772 773 **kw: Any 774 The arguments to be filtered and passed into `func`. 775 776 Returns 777 ------- 778 A dictionary of keyword arguments accepted by `func`. 779 780 Examples 781 -------- 782 ```python 783 >>> def foo(a=1, b=2): 784 ... return a * b 785 >>> filter_keywords(foo, a=2, b=4, c=6) 786 {'a': 2, 'b': 4} 787 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 788 8 789 ``` 790 791 """ 792 import inspect 793 func_params = inspect.signature(func).parameters 794 ### If the function has a **kw method, skip filtering. 795 for param, _type in func_params.items(): 796 if '**' in str(_type): 797 return kw 798 return {k: v for k, v in kw.items() if k in func_params} 799 800 801def filter_positionals( 802 func: Callable[[Any], Any], 803 *args: Any 804) -> Tuple[Any]: 805 """ 806 Filter out unsupported positional arguments. 807 808 Parameters 809 ---------- 810 func: Callable[[Any], Any] 811 The function to inspect. 812 813 *args: Any 814 The arguments to be filtered and passed into `func`. 815 NOTE: If the function signature expects more arguments than provided, 816 the missing slots will be filled with `None`. 817 818 Returns 819 ------- 820 A tuple of positional arguments accepted by `func`. 821 822 Examples 823 -------- 824 ```python 825 >>> def foo(a, b): 826 ... return a * b 827 >>> filter_positionals(foo, 2, 4, 6) 828 (2, 4) 829 >>> foo(*filter_positionals(foo, 2, 4, 6)) 830 8 831 ``` 832 833 """ 834 import inspect 835 from meerschaum.utils.warnings import warn 836 func_params = inspect.signature(func).parameters 837 acceptable_args: List[Any] = [] 838 839 def _warn_invalids(_num_invalid): 840 if _num_invalid > 0: 841 warn( 842 "Too few arguments were provided. " 843 + f"{_num_invalid} argument" 844 + ('s have ' if _num_invalid != 1 else " has ") 845 + " been filled with `None`.", 846 ) 847 848 num_invalid: int = 0 849 for i, (param, val) in enumerate(func_params.items()): 850 if '=' in str(val) or '*' in str(val): 851 _warn_invalids(num_invalid) 852 return tuple(acceptable_args) 853 854 try: 855 acceptable_args.append(args[i]) 856 except IndexError: 857 acceptable_args.append(None) 858 num_invalid += 1 859 860 _warn_invalids(num_invalid) 861 return tuple(acceptable_args) 862 863 864def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 865 """ 866 Convert an ordered dict to a dict. 867 Does not mutate the original OrderedDict. 868 """ 869 from collections import OrderedDict 870 _d = dict(od) 871 for k, v in od.items(): 872 if isinstance(v, OrderedDict) or ( 873 issubclass(type(v), OrderedDict) 874 ): 875 _d[k] = dict_from_od(v) 876 return _d 877 878def remove_ansi(s: str) -> str: 879 """ 880 Remove ANSI escape characters from a string. 881 882 Parameters 883 ---------- 884 s: str: 885 The string to be cleaned. 886 887 Returns 888 ------- 889 A string with the ANSI characters removed. 890 891 Examples 892 -------- 893 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 894 'Hello, World!' 895 896 """ 897 import re 898 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s) 899 900 901def get_connector_labels( 902 *types: str, 903 search_term: str = '', 904 ignore_exact_match = True, 905 _additional_options: Optional[List[str]] = None, 906) -> List[str]: 907 """ 908 Read connector labels from the configuration dictionary. 909 910 Parameters 911 ---------- 912 *types: str 913 The connector types. 914 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 915 916 search_term: str, default '' 917 A filter on the connectors' labels. 918 919 ignore_exact_match: bool, default True 920 If `True`, skip a connector if the search_term is an exact match. 921 922 Returns 923 ------- 924 A list of the keys of defined connectors. 925 926 """ 927 from meerschaum.config import get_config 928 connectors = get_config('meerschaum', 'connectors') 929 930 _types = list(types) 931 if len(_types) == 0: 932 _types = list(connectors.keys()) + ['plugin'] 933 934 conns = [] 935 for t in _types: 936 if t == 'plugin': 937 from meerschaum.plugins import get_data_plugins 938 conns += [ 939 f'{t}:' + plugin.module.__name__.split('.')[-1] 940 for plugin in get_data_plugins() 941 ] 942 continue 943 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 944 945 if _additional_options: 946 conns += _additional_options 947 948 possibilities = [ 949 c 950 for c in conns 951 if c.startswith(search_term) 952 and c != ( 953 search_term if ignore_exact_match else '' 954 ) 955 ] 956 return sorted(possibilities) 957 958 959def json_serialize_datetime(dt: datetime) -> Union[str, None]: 960 """ 961 Serialize a datetime object into JSON (ISO format string). 962 963 Examples 964 -------- 965 >>> import json 966 >>> from datetime import datetime 967 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 968 '{"a": "2022-01-01T00:00:00Z"}' 969 970 """ 971 if not isinstance(dt, datetime): 972 return None 973 tz_suffix = 'Z' if dt.tzinfo is None else '' 974 return dt.isoformat() + tz_suffix 975 976 977def wget( 978 url: str, 979 dest: Optional[Union[str, 'pathlib.Path']] = None, 980 headers: Optional[Dict[str, Any]] = None, 981 color: bool = True, 982 debug: bool = False, 983 **kw: Any 984) -> 'pathlib.Path': 985 """ 986 Mimic `wget` with `requests`. 987 988 Parameters 989 ---------- 990 url: str 991 The URL to the resource to be downloaded. 992 993 dest: Optional[Union[str, pathlib.Path]], default None 994 The destination path of the downloaded file. 995 If `None`, save to the current directory. 996 997 color: bool, default True 998 If `debug` is `True`, print color output. 999 1000 debug: bool, default False 1001 Verbosity toggle. 1002 1003 Returns 1004 ------- 1005 The path to the downloaded file. 1006 1007 """ 1008 from meerschaum.utils.warnings import warn, error 1009 from meerschaum.utils.debug import dprint 1010 import os, pathlib, re, urllib.request 1011 if headers is None: 1012 headers = {} 1013 request = urllib.request.Request(url, headers=headers) 1014 if not color: 1015 dprint = print 1016 if debug: 1017 dprint(f"Downloading from '{url}'...") 1018 try: 1019 response = urllib.request.urlopen(request) 1020 except Exception as e: 1021 import ssl 1022 ssl._create_default_https_context = ssl._create_unverified_context 1023 try: 1024 response = urllib.request.urlopen(request) 1025 except Exception as _e: 1026 print(_e) 1027 response = None 1028 if response is None or response.code != 200: 1029 error_msg = f"Failed to download from '{url}'." 1030 if color: 1031 error(error_msg) 1032 else: 1033 print(error_msg) 1034 import sys 1035 sys.exit(1) 1036 1037 d = response.headers.get('content-disposition', None) 1038 fname = ( 1039 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1040 else url.split('/')[-1] 1041 ) 1042 1043 if dest is None: 1044 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1045 elif isinstance(dest, str): 1046 dest = pathlib.Path(dest) 1047 1048 with open(dest, 'wb') as f: 1049 f.write(response.fp.read()) 1050 1051 if debug: 1052 dprint(f"Downloaded file '{dest}'.") 1053 1054 return dest 1055 1056 1057def async_wrap(func): 1058 """ 1059 Run a synchronous function as async. 1060 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1061 """ 1062 import asyncio 1063 from functools import wraps, partial 1064 1065 @wraps(func) 1066 async def run(*args, loop=None, executor=None, **kwargs): 1067 if loop is None: 1068 loop = asyncio.get_event_loop() 1069 pfunc = partial(func, *args, **kwargs) 1070 return await loop.run_in_executor(executor, pfunc) 1071 return run 1072 1073 1074def debug_trace(browser: bool = True): 1075 """ 1076 Open a web-based debugger to trace the execution of the program. 1077 1078 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1079 """ 1080 from meerschaum.utils.debug import trace 1081 trace(browser=browser) 1082 1083 1084def items_str( 1085 items: List[Any], 1086 quotes: bool = True, 1087 quote_str: str = "'", 1088 commas: bool = True, 1089 comma_str: str = ',', 1090 and_: bool = True, 1091 and_str: str = 'and', 1092 oxford_comma: bool = True, 1093 spaces: bool = True, 1094 space_str = ' ', 1095) -> str: 1096 """ 1097 Return a formatted string if list items separated by commas. 1098 1099 Parameters 1100 ---------- 1101 items: [List[Any]] 1102 The items to be printed as an English list. 1103 1104 quotes: bool, default True 1105 If `True`, wrap items in quotes. 1106 1107 quote_str: str, default "'" 1108 If `quotes` is `True`, prepend and append each item with this string. 1109 1110 and_: bool, default True 1111 If `True`, include the word 'and' before the final item in the list. 1112 1113 and_str: str, default 'and' 1114 If `and_` is True, insert this string where 'and' normally would in and English list. 1115 1116 oxford_comma: bool, default True 1117 If `True`, include the Oxford Comma (comma before the final 'and'). 1118 Only applies when `and_` is `True`. 1119 1120 spaces: bool, default True 1121 If `True`, separate items with `space_str` 1122 1123 space_str: str, default ' ' 1124 If `spaces` is `True`, separate items with this string. 1125 1126 Returns 1127 ------- 1128 A string of the items as an English list. 1129 1130 Examples 1131 -------- 1132 >>> items_str([1,2,3]) 1133 "'1', '2', and '3'" 1134 >>> items_str([1,2,3], quotes=False) 1135 '1, 2, and 3' 1136 >>> items_str([1,2,3], and_=False) 1137 "'1', '2', '3'" 1138 >>> items_str([1,2,3], spaces=False, and_=False) 1139 "'1','2','3'" 1140 >>> items_str([1,2,3], oxford_comma=False) 1141 "'1', '2' and '3'" 1142 >>> items_str([1,2,3], quote_str=":") 1143 ':1:, :2:, and :3:' 1144 >>> items_str([1,2,3], and_str="or") 1145 "'1', '2', or '3'" 1146 >>> items_str([1,2,3], space_str="_") 1147 "'1',_'2',_and_'3'" 1148 1149 """ 1150 if not items: 1151 return '' 1152 1153 q = quote_str if quotes else '' 1154 s = space_str if spaces else '' 1155 a = and_str if and_ else '' 1156 c = comma_str if commas else '' 1157 1158 if len(items) == 1: 1159 return q + str(list(items)[0]) + q 1160 1161 if len(items) == 2: 1162 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1163 1164 sep = q + c + s + q 1165 output = q + sep.join(str(i) for i in items[:-1]) + q 1166 if oxford_comma: 1167 output += c 1168 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1169 return output 1170 1171 1172def interval_str(delta: Union[timedelta, int]) -> str: 1173 """ 1174 Return a human-readable string for a `timedelta` (or `int` minutes). 1175 1176 Parameters 1177 ---------- 1178 delta: Union[timedelta, int] 1179 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1180 1181 Returns 1182 ------- 1183 A formatted string, fit for human eyes. 1184 """ 1185 from meerschaum.utils.packages import attempt_import 1186 humanfriendly = attempt_import('humanfriendly') 1187 delta_seconds = ( 1188 delta.total_seconds() 1189 if isinstance(delta, timedelta) 1190 else (delta * 60) 1191 ) 1192 return humanfriendly.format_timespan(delta_seconds) 1193 1194 1195def is_docker_available() -> bool: 1196 """Check if we can connect to the Docker engine.""" 1197 import subprocess 1198 try: 1199 has_docker = subprocess.call( 1200 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1201 ) == 0 1202 except Exception as e: 1203 has_docker = False 1204 return has_docker 1205 1206 1207def is_android() -> bool: 1208 """Return `True` if the current platform is Android.""" 1209 import sys 1210 return hasattr(sys, 'getandroidapilevel') 1211 1212 1213def is_bcp_available() -> bool: 1214 """Check if the MSSQL `bcp` utility is installed.""" 1215 import subprocess 1216 1217 try: 1218 has_bcp = subprocess.call( 1219 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1220 ) == 0 1221 except Exception as e: 1222 has_bcp = False 1223 return has_bcp 1224 1225 1226def is_systemd_available() -> bool: 1227 """Check if running on systemd.""" 1228 import subprocess 1229 try: 1230 has_systemctl = subprocess.call( 1231 ['systemctl', '-h'], 1232 stdout=subprocess.DEVNULL, 1233 stderr=subprocess.STDOUT, 1234 ) == 0 1235 except Exception: 1236 has_systemctl = False 1237 return has_systemctl 1238 1239def get_last_n_lines(file_name: str, N: int): 1240 """ 1241 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1242 """ 1243 import os 1244 # Create an empty list to keep the track of last N lines 1245 list_of_lines = [] 1246 # Open file for reading in binary mode 1247 with open(file_name, 'rb') as read_obj: 1248 # Move the cursor to the end of the file 1249 read_obj.seek(0, os.SEEK_END) 1250 # Create a buffer to keep the last read line 1251 buffer = bytearray() 1252 # Get the current position of pointer i.e eof 1253 pointer_location = read_obj.tell() 1254 # Loop till pointer reaches the top of the file 1255 while pointer_location >= 0: 1256 # Move the file pointer to the location pointed by pointer_location 1257 read_obj.seek(pointer_location) 1258 # Shift pointer location by -1 1259 pointer_location = pointer_location -1 1260 # read that byte / character 1261 new_byte = read_obj.read(1) 1262 # If the read byte is new line character then it means one line is read 1263 if new_byte == b'\n': 1264 # Save the line in list of lines 1265 list_of_lines.append(buffer.decode()[::-1]) 1266 # If the size of list reaches N, then return the reversed list 1267 if len(list_of_lines) == N: 1268 return list(reversed(list_of_lines)) 1269 # Reinitialize the byte array to save next line 1270 buffer = bytearray() 1271 else: 1272 # If last read character is not eol then add it in buffer 1273 buffer.extend(new_byte) 1274 # As file is read completely, if there is still data in buffer, then its first line. 1275 if len(buffer) > 0: 1276 list_of_lines.append(buffer.decode()[::-1]) 1277 # return the reversed list 1278 return list(reversed(list_of_lines)) 1279 1280 1281def tail(f, n, offset=None): 1282 """ 1283 https://stackoverflow.com/a/692616/9699829 1284 1285 Reads n lines from f with an offset of offset lines. The return 1286 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1287 an indicator that is `True` if there are more lines in the file. 1288 """ 1289 avg_line_length = 74 1290 to_read = n + (offset or 0) 1291 1292 while True: 1293 try: 1294 f.seek(-(avg_line_length * to_read), 2) 1295 except IOError: 1296 # woops. apparently file is smaller than what we want 1297 # to step back, go to the beginning instead 1298 f.seek(0) 1299 pos = f.tell() 1300 lines = f.read().splitlines() 1301 if len(lines) >= to_read or pos == 0: 1302 return lines[-to_read:offset and -offset or None], \ 1303 len(lines) > to_read or pos > 0 1304 avg_line_length *= 1.3 1305 1306 1307def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1308 """ 1309 Remove characters from each section of a string until the length is within the limit. 1310 1311 Parameters 1312 ---------- 1313 item: str 1314 The item name to be truncated. 1315 1316 delimeter: str, default '_' 1317 Split `item` by this string into several sections. 1318 1319 max_len: int, default 128 1320 The max acceptable length of the truncated version of `item`. 1321 1322 Returns 1323 ------- 1324 The truncated string. 1325 1326 Examples 1327 -------- 1328 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1329 'ab_de_gh' 1330 1331 """ 1332 if len(item) < max_len: 1333 return item 1334 1335 def _shorten(s: str) -> str: 1336 return s[:-1] if len(s) > 1 else s 1337 1338 sections = list(enumerate(item.split('_'))) 1339 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1340 available_chars = max_len - len(sections) 1341 1342 _sections = [(i, s) for i, s in sorted_sections] 1343 _sections_len = sum([len(s) for i, s in _sections]) 1344 _old_sections_len = _sections_len 1345 while _sections_len > available_chars: 1346 _sections = [(i, _shorten(s)) for i, s in _sections] 1347 _old_sections_len = _sections_len 1348 _sections_len = sum([len(s) for i, s in _sections]) 1349 if _old_sections_len == _sections_len: 1350 raise Exception(f"String could not be truncated: '{item}'") 1351 1352 new_sections = sorted(_sections, key=lambda x: x[0]) 1353 return delimeter.join([s for i, s in new_sections]) 1354 1355 1356def separate_negation_values( 1357 vals: Union[List[str], Tuple[str]], 1358 negation_prefix: Optional[str] = None, 1359) -> Tuple[List[str], List[str]]: 1360 """ 1361 Separate the negated values from the positive ones. 1362 Return two lists: positive and negative values. 1363 1364 Parameters 1365 ---------- 1366 vals: Union[List[str], Tuple[str]] 1367 A list of strings to parse. 1368 1369 negation_prefix: Optional[str], default None 1370 Include values that start with this string in the second list. 1371 If `None`, use the system default (`_`). 1372 """ 1373 if negation_prefix is None: 1374 from meerschaum.config.static import STATIC_CONFIG 1375 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1376 _in_vals, _ex_vals = [], [] 1377 for v in vals: 1378 if str(v).startswith(negation_prefix): 1379 _ex_vals.append(str(v)[len(negation_prefix):]) 1380 else: 1381 _in_vals.append(v) 1382 1383 return _in_vals, _ex_vals 1384 1385 1386def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1387 """ 1388 Translate a params dictionary into lists of include- and exclude-values. 1389 1390 Parameters 1391 ---------- 1392 params: Optional[Dict[str, Any]] 1393 A params query dictionary. 1394 1395 Returns 1396 ------- 1397 A dictionary mapping keys to a tuple of lists for include and exclude values. 1398 1399 Examples 1400 -------- 1401 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1402 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1403 """ 1404 if not params: 1405 return {} 1406 return { 1407 col: separate_negation_values( 1408 ( 1409 val 1410 if isinstance(val, (list, tuple)) 1411 else [val] 1412 ) 1413 ) 1414 for col, val in params.items() 1415 } 1416 1417 1418def flatten_list(list_: List[Any]) -> List[Any]: 1419 """ 1420 Recursively flatten a list. 1421 """ 1422 for item in list_: 1423 if isinstance(item, list): 1424 yield from flatten_list(item) 1425 else: 1426 yield item 1427 1428 1429def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1430 """ 1431 Parse a string containing the text to be passed into a function 1432 and return a tuple of args, kwargs. 1433 1434 Parameters 1435 ---------- 1436 args_str: str 1437 The contents of the function parameter (as a string). 1438 1439 Returns 1440 ------- 1441 A tuple of args (tuple) and kwargs (dict[str, Any]). 1442 1443 Examples 1444 -------- 1445 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1446 (123, 456), {'foo': 789, 'bar': 'baz'} 1447 """ 1448 import ast 1449 args = [] 1450 kwargs = {} 1451 1452 for part in args_str.split(','): 1453 if '=' in part: 1454 key, val = part.split('=', 1) 1455 kwargs[key.strip()] = ast.literal_eval(val) 1456 else: 1457 args.append(ast.literal_eval(part.strip())) 1458 1459 return tuple(args), kwargs 1460 1461 1462def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1463 """ 1464 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1465 1466 Parameters 1467 ---------- 1468 src_path: pathlib.Path 1469 The source path. 1470 1471 dest_path: pathlib.Path 1472 The destination path. 1473 1474 Returns 1475 ------- 1476 A SuccessTuple indicating success. 1477 """ 1478 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1479 return True, "Symlink already exists." 1480 try: 1481 dest_path.symlink_to(src_path) 1482 success = True 1483 except Exception as e: 1484 success = False 1485 msg = str(e) 1486 if success: 1487 return success, "Success" 1488 1489 ### Failed to create a symlink. 1490 ### If we're not on Windows, return an error. 1491 import platform 1492 if platform.system() != 'Windows': 1493 return success, msg 1494 1495 try: 1496 import _winapi 1497 except ImportError: 1498 return False, "Unable to import _winapi." 1499 1500 if src_path.is_dir(): 1501 try: 1502 _winapi.CreateJunction(str(src_path), str(dest_path)) 1503 except Exception as e: 1504 return False, str(e) 1505 return True, "Success" 1506 1507 ### Last resort: copy the file on Windows. 1508 import shutil 1509 try: 1510 shutil.copy(src_path, dest_path) 1511 except Exception as e: 1512 return False, str(e) 1513 1514 return True, "Success" 1515 1516 1517def is_symlink(path: pathlib.Path) -> bool: 1518 """ 1519 Wrap `path.is_symlink()` but add support for Windows junctions. 1520 """ 1521 if path.is_symlink(): 1522 return True 1523 import platform, os 1524 if platform.system() != 'Windows': 1525 return False 1526 try: 1527 return bool(os.readlink(path)) 1528 except OSError: 1529 return False 1530 1531 1532def parametrized(dec): 1533 """ 1534 A meta-decorator for allowing other decorator functions to have parameters. 1535 1536 https://stackoverflow.com/a/26151604/9699829 1537 """ 1538 def layer(*args, **kwargs): 1539 def repl(f): 1540 return dec(f, *args, **kwargs) 1541 return repl 1542 return layer 1543 1544 1545def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1546 """ 1547 Safely extract a TAR file to a give directory. 1548 This defends against CVE-2007-4559. 1549 1550 Parameters 1551 ---------- 1552 tarf: file 1553 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1554 1555 output_dir: Union[str, pathlib.Path] 1556 The output directory. 1557 """ 1558 import os 1559 1560 def is_within_directory(directory, target): 1561 abs_directory = os.path.abspath(directory) 1562 abs_target = os.path.abspath(target) 1563 prefix = os.path.commonprefix([abs_directory, abs_target]) 1564 return prefix == abs_directory 1565 1566 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1567 for member in tar.getmembers(): 1568 member_path = os.path.join(path, member.name) 1569 if not is_within_directory(path, member_path): 1570 raise Exception("Attempted Path Traversal in Tar File") 1571 1572 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1573 1574 return safe_extract(tarf, output_dir) 1575 1576 1577################## 1578# Legacy imports # 1579################## 1580 1581def choose_subaction(*args, **kwargs) -> Any: 1582 """ 1583 Placeholder function to prevent breaking legacy behavior. 1584 See `meerschaum.actions.choose_subaction`. 1585 """ 1586 from meerschaum.actions import choose_subaction as _choose_subactions 1587 return _choose_subactions(*args, **kwargs) 1588 1589 1590def print_options(*args, **kwargs) -> None: 1591 """ 1592 Placeholder function to prevent breaking legacy behavior. 1593 See `meerschaum.utils.formatting.print_options`. 1594 """ 1595 from meerschaum.utils.formatting import print_options as _print_options 1596 return _print_options(*args, **kwargs) 1597 1598 1599def to_pandas_dtype(*args, **kwargs) -> Any: 1600 """ 1601 Placeholder function to prevent breaking legacy behavior. 1602 See `meerschaum.utils.dtypes.to_pandas_dtype`. 1603 """ 1604 from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype 1605 return _to_pandas_dtype(*args, **kwargs) 1606 1607 1608def filter_unseen_df(*args, **kwargs) -> Any: 1609 """ 1610 Placeholder function to prevent breaking legacy behavior. 1611 See `meerschaum.utils.dataframe.filter_unseen_df`. 1612 """ 1613 from meerschaum.utils.dataframe import filter_unseen_df as real_function 1614 return real_function(*args, **kwargs) 1615 1616 1617def add_missing_cols_to_df(*args, **kwargs) -> Any: 1618 """ 1619 Placeholder function to prevent breaking legacy behavior. 1620 See `meerschaum.utils.dataframe.add_missing_cols_to_df`. 1621 """ 1622 from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function 1623 return real_function(*args, **kwargs) 1624 1625 1626def parse_df_datetimes(*args, **kwargs) -> Any: 1627 """ 1628 Placeholder function to prevent breaking legacy behavior. 1629 See `meerschaum.utils.dataframe.parse_df_datetimes`. 1630 """ 1631 from meerschaum.utils.dataframe import parse_df_datetimes as real_function 1632 return real_function(*args, **kwargs) 1633 1634 1635def df_from_literal(*args, **kwargs) -> Any: 1636 """ 1637 Placeholder function to prevent breaking legacy behavior. 1638 See `meerschaum.utils.dataframe.df_from_literal`. 1639 """ 1640 from meerschaum.utils.dataframe import df_from_literal as real_function 1641 return real_function(*args, **kwargs) 1642 1643 1644def get_json_cols(*args, **kwargs) -> Any: 1645 """ 1646 Placeholder function to prevent breaking legacy behavior. 1647 See `meerschaum.utils.dataframe.get_json_cols`. 1648 """ 1649 from meerschaum.utils.dataframe import get_json_cols as real_function 1650 return real_function(*args, **kwargs) 1651 1652 1653def get_unhashable_cols(*args, **kwargs) -> Any: 1654 """ 1655 Placeholder function to prevent breaking legacy behavior. 1656 See `meerschaum.utils.dataframe.get_unhashable_cols`. 1657 """ 1658 from meerschaum.utils.dataframe import get_unhashable_cols as real_function 1659 return real_function(*args, **kwargs) 1660 1661 1662def enforce_dtypes(*args, **kwargs) -> Any: 1663 """ 1664 Placeholder function to prevent breaking legacy behavior. 1665 See `meerschaum.utils.dataframe.enforce_dtypes`. 1666 """ 1667 from meerschaum.utils.dataframe import enforce_dtypes as real_function 1668 return real_function(*args, **kwargs) 1669 1670 1671def get_datetime_bound_from_df(*args, **kwargs) -> Any: 1672 """ 1673 Placeholder function to prevent breaking legacy behavior. 1674 See `meerschaum.utils.dataframe.get_datetime_bound_from_df`. 1675 """ 1676 from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function 1677 return real_function(*args, **kwargs) 1678 1679 1680def df_is_chunk_generator(*args, **kwargs) -> Any: 1681 """ 1682 Placeholder function to prevent breaking legacy behavior. 1683 See `meerschaum.utils.dataframe.df_is_chunk_generator`. 1684 """ 1685 from meerschaum.utils.dataframe import df_is_chunk_generator as real_function 1686 return real_function(*args, **kwargs) 1687 1688 1689def choices_docstring(*args, **kwargs) -> Any: 1690 """ 1691 Placeholder function to prevent breaking legacy behavior. 1692 See `meerschaum.actions.choices_docstring`. 1693 """ 1694 from meerschaum.actions import choices_docstring as real_function 1695 return real_function(*args, **kwargs) 1696 1697 1698def _get_subaction_names(*args, **kwargs) -> Any: 1699 """ 1700 Placeholder function to prevent breaking legacy behavior. 1701 See `meerschaum.actions._get_subaction_names`. 1702 """ 1703 from meerschaum.actions import _get_subaction_names as real_function 1704 return real_function(*args, **kwargs) 1705 1706 1707_current_module = sys.modules[__name__] 1708__all__ = tuple( 1709 name 1710 for name, obj in globals().items() 1711 if callable(obj) 1712 and name not in __pdoc__ 1713 and getattr(obj, '__module__', None) == _current_module.__name__ 1714)
49def add_method_to_class( 50 func: Callable[[Any], Any], 51 class_def: 'Class', 52 method_name: Optional[str] = None, 53 keep_self: Optional[bool] = None, 54 ) -> Callable[[Any], Any]: 55 """ 56 Add function `func` to class `class_def`. 57 58 Parameters 59 ---------- 60 func: Callable[[Any], Any] 61 Function to be added as a method of the class 62 63 class_def: Class 64 Class to be modified. 65 66 method_name: Optional[str], default None 67 New name of the method. None will use func.__name__ (default). 68 69 Returns 70 ------- 71 The modified function object. 72 73 """ 74 from functools import wraps 75 76 is_class = isinstance(class_def, type) 77 78 @wraps(func) 79 def wrapper(self, *args, **kw): 80 return func(*args, **kw) 81 82 if method_name is None: 83 method_name = func.__name__ 84 85 setattr(class_def, method_name, ( 86 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 87 ) 88 ) 89 90 return func
Add function func
to class class_def
.
Parameters
- func (Callable[[Any], Any]): Function to be added as a method of the class
- class_def (Class): Class to be modified.
- method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
- The modified function object.
93def generate_password(length: int = 12) -> str: 94 """Generate a secure password of given length. 95 96 Parameters 97 ---------- 98 length : int, default 12 99 The length of the password. 100 101 Returns 102 ------- 103 A random password string. 104 105 """ 106 import secrets, string 107 return ''.join((secrets.choice(string.ascii_letters) for i in range(length)))
Generate a secure password of given length.
Parameters
- length (int, default 12): The length of the password.
Returns
- A random password string.
109def is_int(s : str) -> bool: 110 """ 111 Check if string is an int. 112 113 Parameters 114 ---------- 115 s: str 116 The string to be checked. 117 118 Returns 119 ------- 120 A bool indicating whether the string was able to be cast to an integer. 121 122 """ 123 try: 124 float(s) 125 except Exception as e: 126 return False 127 128 return float(s).is_integer()
Check if string is an int.
Parameters
- s (str): The string to be checked.
Returns
- A bool indicating whether the string was able to be cast to an integer.
131def string_to_dict( 132 params_string: str 133 ) -> Dict[str, Any]: 134 """ 135 Parse a string into a dictionary. 136 137 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 138 139 Parameters 140 ---------- 141 params_string: str 142 The string to be parsed. 143 144 Returns 145 ------- 146 The parsed dictionary. 147 148 Examples 149 -------- 150 >>> string_to_dict("a:1,b:2") 151 {'a': 1, 'b': 2} 152 >>> string_to_dict('{"a": 1, "b": 2}') 153 {'a': 1, 'b': 2} 154 155 """ 156 if params_string == "": 157 return {} 158 159 import json 160 161 ### Kind of a weird edge case. 162 ### In the generated compose file, there is some weird escaping happening, 163 ### so the string to be parsed starts and ends with a single quote. 164 if ( 165 isinstance(params_string, str) 166 and len(params_string) > 4 167 and params_string[1] == "{" 168 and params_string[-2] == "}" 169 ): 170 return json.loads(params_string[1:-1]) 171 if str(params_string).startswith('{'): 172 return json.loads(params_string) 173 174 import ast 175 params_dict = {} 176 for param in params_string.split(","): 177 _keys = param.split(":") 178 keys = _keys[:-1] 179 try: 180 val = ast.literal_eval(_keys[-1]) 181 except Exception as e: 182 val = str(_keys[-1]) 183 184 c = params_dict 185 for _k in keys[:-1]: 186 try: 187 k = ast.literal_eval(_k) 188 except Exception as e: 189 k = str(_k) 190 if k not in c: 191 c[k] = {} 192 c = c[k] 193 194 c[keys[-1]] = val 195 196 return params_dict
Parse a string into a dictionary.
If the string begins with '{', parse as JSON. Otherwise use simple parsing.
Parameters
- params_string (str): The string to be parsed.
Returns
- The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2")
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
199def parse_config_substitution( 200 value: str, 201 leading_key: str = 'MRSM', 202 begin_key: str = '{', 203 end_key: str = '}', 204 delimeter: str = ':' 205 ) -> List[Any]: 206 """ 207 Parse Meerschaum substitution syntax 208 E.g. MRSM{value1:value2} => ['value1', 'value2'] 209 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 210 """ 211 if not value.beginswith(leading_key): 212 return value 213 214 return leading_key[len(leading_key):][len():-1].split(delimeter)
Parse Meerschaum substitution syntax
E.g. MRSM{value1:value2} => ['value1', 'value2']
NOTE: Not currently used. See search_and_substitute_config
in meerschaum.config._read_yaml
.
217def edit_file( 218 path: Union['pathlib.Path', str], 219 default_editor: str = 'pyvim', 220 debug: bool = False 221) -> bool: 222 """ 223 Open a file for editing. 224 225 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 226 227 Parameters 228 ---------- 229 path: Union[pathlib.Path, str] 230 The path to the file to be edited. 231 232 default_editor: str, default 'pyvim' 233 If `$EDITOR` is not set, use this instead. 234 If `pyvim` is not installed, it will install it from PyPI. 235 236 debug: bool, default False 237 Verbosity toggle. 238 239 Returns 240 ------- 241 A bool indicating the file was successfully edited. 242 """ 243 import os 244 from subprocess import call 245 from meerschaum.utils.debug import dprint 246 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 247 try: 248 EDITOR = os.environ.get('EDITOR', default_editor) 249 if debug: 250 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 251 rc = call([EDITOR, path]) 252 except Exception as e: ### can't open with default editors 253 if debug: 254 dprint(str(e)) 255 dprint('Failed to open file with system editor. Falling back to pyvim...') 256 pyvim = attempt_import('pyvim', lazy=False) 257 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 258 return rc == 0
Open a file for editing.
Attempt to launch the user's defined $EDITOR
, otherwise use pyvim
.
Parameters
- path (Union[pathlib.Path, str]): The path to the file to be edited.
- default_editor (str, default 'pyvim'):
If
$EDITOR
is not set, use this instead. Ifpyvim
is not installed, it will install it from PyPI. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating the file was successfully edited.
261def is_pipe_registered( 262 pipe: mrsm.Pipe, 263 pipes: PipesDict, 264 debug: bool = False 265) -> bool: 266 """ 267 Check if a Pipe is inside the pipes dictionary. 268 269 Parameters 270 ---------- 271 pipe: meerschaum.Pipe 272 The pipe to see if it's in the dictionary. 273 274 pipes: PipesDict 275 The dictionary to search inside. 276 277 debug: bool, default False 278 Verbosity toggle. 279 280 Returns 281 ------- 282 A bool indicating whether the pipe is inside the dictionary. 283 """ 284 from meerschaum.utils.debug import dprint 285 ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key 286 if debug: 287 dprint(f'{ck}, {mk}, {lk}') 288 dprint(f'{pipe}, {pipes}') 289 return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]
Check if a Pipe is inside the pipes dictionary.
Parameters
- pipe (meerschaum.Pipe): The pipe to see if it's in the dictionary.
- pipes (PipesDict): The dictionary to search inside.
- debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating whether the pipe is inside the dictionary.
292def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 293 """ 294 Determine the columns and lines in the terminal. 295 If they cannot be determined, return the default values (100 columns and 120 lines). 296 297 Parameters 298 ---------- 299 default_cols: int, default 100 300 If the columns cannot be determined, return this value. 301 302 default_lines: int, default 120 303 If the lines cannot be determined, return this value. 304 305 Returns 306 ------- 307 A tuple if integers for the columns and lines. 308 """ 309 import os 310 try: 311 size = os.get_terminal_size() 312 _cols, _lines = size.columns, size.lines 313 except Exception as e: 314 _cols, _lines = ( 315 int(os.environ.get('COLUMNS', str(default_cols))), 316 int(os.environ.get('LINES', str(default_lines))), 317 ) 318 return _cols, _lines
Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).
Parameters
- default_cols (int, default 100): If the columns cannot be determined, return this value.
- default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
- A tuple if integers for the columns and lines.
321def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 322 """ 323 Iterate over a list in chunks. 324 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 325 326 Parameters 327 ---------- 328 iterable: Iterable[Any] 329 The iterable to iterate over in chunks. 330 331 chunksize: int 332 The size of chunks to iterate with. 333 334 fillvalue: Optional[Any], default None 335 If the chunks do not evenly divide into the iterable, pad the end with this value. 336 337 Returns 338 ------- 339 A generator of tuples of size `chunksize`. 340 341 """ 342 from itertools import zip_longest 343 args = [iter(iterable)] * chunksize 344 return zip_longest(*args, fillvalue=fillvalue)
Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
Parameters
- iterable (Iterable[Any]): The iterable to iterate over in chunks.
- chunksize (int): The size of chunks to iterate with.
- fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
- A generator of tuples of size
chunksize
.
346def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 347 """ 348 Sort a dictionary's values and return a new dictionary. 349 350 Parameters 351 ---------- 352 d: Dict[Any, Any] 353 The dictionary to be sorted. 354 355 Returns 356 ------- 357 A sorted dictionary. 358 359 Examples 360 -------- 361 >>> sorted_dict({'b': 1, 'a': 2}) 362 {'b': 1, 'a': 2} 363 >>> sorted_dict({'b': 2, 'a': 1}) 364 {'a': 1, 'b': 2} 365 366 """ 367 try: 368 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 369 except Exception as e: 370 return d
Sort a dictionary's values and return a new dictionary.
Parameters
- d (Dict[Any, Any]): The dictionary to be sorted.
Returns
- A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
372def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 373 """ 374 Convert the standard pipes dictionary into a list. 375 376 Parameters 377 ---------- 378 pipes_dict: PipesDict 379 The pipes dictionary to be flattened. 380 381 Returns 382 ------- 383 A list of `Pipe` objects. 384 385 """ 386 pipes_list = [] 387 for ck in pipes_dict.values(): 388 for mk in ck.values(): 389 pipes_list += list(mk.values()) 390 return pipes_list
Convert the standard pipes dictionary into a list.
Parameters
- pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
- A list of
Pipe
objects.
393def round_time( 394 dt: Optional[datetime] = None, 395 date_delta: Optional[timedelta] = None, 396 to: 'str' = 'down' 397) -> datetime: 398 """ 399 Round a datetime object to a multiple of a timedelta. 400 http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python 401 402 NOTE: This function strips timezone information! 403 404 Parameters 405 ---------- 406 dt: Optional[datetime], default None 407 If `None`, grab the current UTC datetime. 408 409 date_delta: Optional[timedelta], default None 410 If `None`, use a delta of 1 minute. 411 412 to: 'str', default 'down' 413 Available options are `'up'`, `'down'`, and `'closest'`. 414 415 Returns 416 ------- 417 A rounded `datetime` object. 418 419 Examples 420 -------- 421 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 422 datetime.datetime(2022, 1, 1, 12, 15) 423 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 424 datetime.datetime(2022, 1, 1, 12, 16) 425 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 426 datetime.datetime(2022, 1, 1, 12, 0) 427 >>> round_time( 428 ... datetime(2022, 1, 1, 12, 15, 57, 200), 429 ... timedelta(hours=1), 430 ... to = 'closest' 431 ... ) 432 datetime.datetime(2022, 1, 1, 12, 0) 433 >>> round_time( 434 ... datetime(2022, 1, 1, 12, 45, 57, 200), 435 ... datetime.timedelta(hours=1), 436 ... to = 'closest' 437 ... ) 438 datetime.datetime(2022, 1, 1, 13, 0) 439 440 """ 441 if date_delta is None: 442 date_delta = timedelta(minutes=1) 443 round_to = date_delta.total_seconds() 444 if dt is None: 445 dt = datetime.now(timezone.utc).replace(tzinfo=None) 446 seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds 447 448 if seconds % round_to == 0 and dt.microsecond == 0: 449 rounding = (seconds + round_to / 2) // round_to * round_to 450 else: 451 if to == 'up': 452 rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to 453 elif to == 'down': 454 rounding = seconds // round_to * round_to 455 else: 456 rounding = (seconds + round_to / 2) // round_to * round_to 457 458 return dt + timedelta(0, rounding - seconds, - dt.microsecond)
Round a datetime object to a multiple of a timedelta. http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
NOTE: This function strips timezone information!
Parameters
- dt (Optional[datetime], default None):
If
None
, grab the current UTC datetime. - date_delta (Optional[timedelta], default None):
If
None
, use a delta of 1 minute. - to ('str', default 'down'):
Available options are
'up'
,'down'
, and'closest'
.
Returns
- A rounded
datetime
object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 15, 57, 200),
... timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 45, 57, 200),
... datetime.timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)
461def timed_input( 462 seconds: int = 10, 463 timeout_message: str = "", 464 prompt: str = "", 465 icon: bool = False, 466 **kw 467 ) -> Union[str, None]: 468 """ 469 Accept user input only for a brief period of time. 470 471 Parameters 472 ---------- 473 seconds: int, default 10 474 The number of seconds to wait. 475 476 timeout_message: str, default '' 477 The message to print after the window has elapsed. 478 479 prompt: str, default '' 480 The prompt to print during the window. 481 482 icon: bool, default False 483 If `True`, print the configured input icon. 484 485 486 Returns 487 ------- 488 The input string entered by the user. 489 490 """ 491 import signal, time 492 493 class TimeoutExpired(Exception): 494 """Raise this exception when the timeout is reached.""" 495 496 def alarm_handler(signum, frame): 497 raise TimeoutExpired 498 499 # set signal handler 500 signal.signal(signal.SIGALRM, alarm_handler) 501 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 502 503 try: 504 return input(prompt) 505 except TimeoutExpired: 506 return None 507 except (EOFError, RuntimeError): 508 try: 509 print(prompt) 510 time.sleep(seconds) 511 except TimeoutExpired: 512 return None 513 finally: 514 signal.alarm(0) # cancel alarm
Accept user input only for a brief period of time.
Parameters
- seconds (int, default 10): The number of seconds to wait.
- timeout_message (str, default ''): The message to print after the window has elapsed.
- prompt (str, default ''): The prompt to print during the window.
- icon (bool, default False):
If
True
, print the configured input icon.
Returns
- The input string entered by the user.
520def replace_pipes_in_dict( 521 pipes : Optional[PipesDict] = None, 522 func: 'function' = str, 523 debug: bool = False, 524 **kw 525 ) -> PipesDict: 526 """ 527 Replace the Pipes in a Pipes dict with the result of another function. 528 529 Parameters 530 ---------- 531 pipes: Optional[PipesDict], default None 532 The pipes dict to be processed. 533 534 func: Callable[[Any], Any], default str 535 The function to be applied to every pipe. 536 Defaults to the string constructor. 537 538 debug: bool, default False 539 Verbosity toggle. 540 541 542 Returns 543 ------- 544 A dictionary where every pipe is replaced with the output of a function. 545 546 """ 547 import copy 548 def change_dict(d : Dict[Any, Any], func : 'function') -> None: 549 for k, v in d.items(): 550 if isinstance(v, dict): 551 change_dict(v, func) 552 else: 553 d[k] = func(v) 554 555 if pipes is None: 556 from meerschaum import get_pipes 557 pipes = get_pipes(debug=debug, **kw) 558 559 result = copy.deepcopy(pipes) 560 change_dict(result, func) 561 return result
Replace the Pipes in a Pipes dict with the result of another function.
Parameters
- pipes (Optional[PipesDict], default None): The pipes dict to be processed.
- func (Callable[[Any], Any], default str): The function to be applied to every pipe. Defaults to the string constructor.
- debug (bool, default False): Verbosity toggle.
Returns
- A dictionary where every pipe is replaced with the output of a function.
563def enforce_gevent_monkey_patch(): 564 """ 565 Check if gevent monkey patching is enabled, and if not, then apply patching. 566 """ 567 from meerschaum.utils.packages import attempt_import 568 import socket 569 gevent, gevent_socket, gevent_monkey = attempt_import( 570 'gevent', 'gevent.socket', 'gevent.monkey' 571 ) 572 if not socket.socket is gevent_socket.socket: 573 gevent_monkey.patch_all()
Check if gevent monkey patching is enabled, and if not, then apply patching.
575def is_valid_email(email: str) -> Union['re.Match', None]: 576 """ 577 Check whether a string is a valid email. 578 579 Parameters 580 ---------- 581 email: str 582 The string to be examined. 583 584 Returns 585 ------- 586 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 587 588 Examples 589 -------- 590 >>> is_valid_email('foo') 591 >>> is_valid_email('foo@foo.com') 592 <re.Match object; span=(0, 11), match='foo@foo.com'> 593 594 """ 595 import re 596 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 597 return re.search(regex, email)
Check whether a string is a valid email.
Parameters
- email (str): The string to be examined.
Returns
- None if a string is not in email format, otherwise a
re.Match
object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
600def string_width(string: str, widest: bool = True) -> int: 601 """ 602 Calculate the width of a string, either by its widest or last line. 603 604 Parameters 605 ---------- 606 string: str: 607 The string to be examined. 608 609 widest: bool, default True 610 No longer used because `widest` is always assumed to be true. 611 612 Returns 613 ------- 614 An integer for the text's visual width. 615 616 Examples 617 -------- 618 >>> string_width('a') 619 1 620 >>> string_width('a\\nbc\\nd') 621 2 622 623 """ 624 def _widest(): 625 words = string.split('\n') 626 max_length = 0 627 for w in words: 628 length = len(w) 629 if length > max_length: 630 max_length = length 631 return max_length 632 633 return _widest()
Calculate the width of a string, either by its widest or last line.
Parameters
- string (str:): The string to be examined.
- widest (bool, default True):
No longer used because
widest
is always assumed to be true.
Returns
- An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
635def _pyinstaller_traverse_dir( 636 directory: str, 637 ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), 638 include_dotfiles: bool = False 639 ) -> list: 640 """ 641 Recursively traverse a directory and return a list of its contents. 642 """ 643 import os, pathlib 644 paths = [] 645 _directory = pathlib.Path(directory) 646 647 def _found_pattern(name: str): 648 for pattern in ignore_patterns: 649 if pattern.replace('/', os.path.sep) in str(name): 650 return True 651 return False 652 653 for root, dirs, files in os.walk(_directory): 654 _root = str(root)[len(str(_directory.parent)):] 655 if _root.startswith(os.path.sep): 656 _root = _root[len(os.path.sep):] 657 if _root.startswith('.') and not include_dotfiles: 658 continue 659 ### ignore certain patterns 660 if _found_pattern(_root): 661 continue 662 663 for filename in files: 664 if filename.startswith('.') and not include_dotfiles: 665 continue 666 path = os.path.join(root, filename) 667 if _found_pattern(path): 668 continue 669 670 _path = str(path)[len(str(_directory.parent)):] 671 if _path.startswith(os.path.sep): 672 _path = _path[len(os.path.sep):] 673 _path = os.path.sep.join(_path.split(os.path.sep)[:-1]) 674 675 paths.append((path, _path)) 676 return paths
Recursively traverse a directory and return a list of its contents.
679def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 680 """ 681 Recursively replace passwords in a dictionary. 682 683 Parameters 684 ---------- 685 d: Dict[str, Any] 686 The dictionary to search through. 687 688 replace_with: str, default '*' 689 The string to replace each character of the password with. 690 691 Returns 692 ------- 693 Another dictionary where values to the keys `'password'` 694 are replaced with `replace_with` (`'*'`). 695 696 Examples 697 -------- 698 >>> replace_password({'a': 1}) 699 {'a': 1} 700 >>> replace_password({'password': '123'}) 701 {'password': '***'} 702 >>> replace_password({'nested': {'password': '123'}}) 703 {'nested': {'password': '***'}} 704 >>> replace_password({'password': '123'}, replace_with='!') 705 {'password': '!!!'} 706 707 """ 708 import copy 709 _d = copy.deepcopy(d) 710 for k, v in d.items(): 711 if isinstance(v, dict): 712 _d[k] = replace_password(v) 713 elif 'password' in str(k).lower(): 714 _d[k] = ''.join([replace_with for char in str(v)]) 715 elif str(k).lower() == 'uri': 716 from meerschaum.connectors.sql import SQLConnector 717 try: 718 uri_params = SQLConnector.parse_uri(v) 719 except Exception as e: 720 uri_params = None 721 if not uri_params: 722 continue 723 if not 'username' in uri_params or not 'password' in uri_params: 724 continue 725 _d[k] = v.replace( 726 uri_params['username'] + ':' + uri_params['password'], 727 uri_params['username'] + ':' + ''.join( 728 [replace_with for char in str(uri_params['password'])] 729 ) 730 ) 731 return _d
Recursively replace passwords in a dictionary.
Parameters
- d (Dict[str, Any]): The dictionary to search through.
- replace_with (str, default '*'): The string to replace each character of the password with.
Returns
- Another dictionary where values to the keys
'password'
- are replaced with
replace_with
('*'
).
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
734def filter_arguments( 735 func: Callable[[Any], Any], 736 *args: Any, 737 **kwargs: Any 738) -> Tuple[Tuple[Any], Dict[str, Any]]: 739 """ 740 Filter out unsupported positional and keyword arguments. 741 742 Parameters 743 ---------- 744 func: Callable[[Any], Any] 745 The function to inspect. 746 747 *args: Any 748 Positional arguments to filter and pass to `func`. 749 750 **kwargs 751 Keyword arguments to filter and pass to `func`. 752 753 Returns 754 ------- 755 The `args` and `kwargs` accepted by `func`. 756 """ 757 args = filter_positionals(func, *args) 758 kwargs = filter_keywords(func, **kwargs) 759 return args, kwargs
Filter out unsupported positional and keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
Positional arguments to filter and pass to
func
. - **kwargs: Keyword arguments to filter and pass to
func
.
Returns
- The
args
andkwargs
accepted byfunc
.
762def filter_keywords( 763 func: Callable[[Any], Any], 764 **kw: Any 765) -> Dict[str, Any]: 766 """ 767 Filter out unsupported keyword arguments. 768 769 Parameters 770 ---------- 771 func: Callable[[Any], Any] 772 The function to inspect. 773 774 **kw: Any 775 The arguments to be filtered and passed into `func`. 776 777 Returns 778 ------- 779 A dictionary of keyword arguments accepted by `func`. 780 781 Examples 782 -------- 783 ```python 784 >>> def foo(a=1, b=2): 785 ... return a * b 786 >>> filter_keywords(foo, a=2, b=4, c=6) 787 {'a': 2, 'b': 4} 788 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 789 8 790 ``` 791 792 """ 793 import inspect 794 func_params = inspect.signature(func).parameters 795 ### If the function has a **kw method, skip filtering. 796 for param, _type in func_params.items(): 797 if '**' in str(_type): 798 return kw 799 return {k: v for k, v in kw.items() if k in func_params}
Filter out unsupported keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- **kw (Any):
The arguments to be filtered and passed into
func
.
Returns
- A dictionary of keyword arguments accepted by
func
.
Examples
>>> def foo(a=1, b=2):
... return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
802def filter_positionals( 803 func: Callable[[Any], Any], 804 *args: Any 805) -> Tuple[Any]: 806 """ 807 Filter out unsupported positional arguments. 808 809 Parameters 810 ---------- 811 func: Callable[[Any], Any] 812 The function to inspect. 813 814 *args: Any 815 The arguments to be filtered and passed into `func`. 816 NOTE: If the function signature expects more arguments than provided, 817 the missing slots will be filled with `None`. 818 819 Returns 820 ------- 821 A tuple of positional arguments accepted by `func`. 822 823 Examples 824 -------- 825 ```python 826 >>> def foo(a, b): 827 ... return a * b 828 >>> filter_positionals(foo, 2, 4, 6) 829 (2, 4) 830 >>> foo(*filter_positionals(foo, 2, 4, 6)) 831 8 832 ``` 833 834 """ 835 import inspect 836 from meerschaum.utils.warnings import warn 837 func_params = inspect.signature(func).parameters 838 acceptable_args: List[Any] = [] 839 840 def _warn_invalids(_num_invalid): 841 if _num_invalid > 0: 842 warn( 843 "Too few arguments were provided. " 844 + f"{_num_invalid} argument" 845 + ('s have ' if _num_invalid != 1 else " has ") 846 + " been filled with `None`.", 847 ) 848 849 num_invalid: int = 0 850 for i, (param, val) in enumerate(func_params.items()): 851 if '=' in str(val) or '*' in str(val): 852 _warn_invalids(num_invalid) 853 return tuple(acceptable_args) 854 855 try: 856 acceptable_args.append(args[i]) 857 except IndexError: 858 acceptable_args.append(None) 859 num_invalid += 1 860 861 _warn_invalids(num_invalid) 862 return tuple(acceptable_args)
Filter out unsupported positional arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
The arguments to be filtered and passed into
func
. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled withNone
.
Returns
- A tuple of positional arguments accepted by
func
.
Examples
>>> def foo(a, b):
... return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
865def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 866 """ 867 Convert an ordered dict to a dict. 868 Does not mutate the original OrderedDict. 869 """ 870 from collections import OrderedDict 871 _d = dict(od) 872 for k, v in od.items(): 873 if isinstance(v, OrderedDict) or ( 874 issubclass(type(v), OrderedDict) 875 ): 876 _d[k] = dict_from_od(v) 877 return _d
Convert an ordered dict to a dict. Does not mutate the original OrderedDict.
879def remove_ansi(s: str) -> str: 880 """ 881 Remove ANSI escape characters from a string. 882 883 Parameters 884 ---------- 885 s: str: 886 The string to be cleaned. 887 888 Returns 889 ------- 890 A string with the ANSI characters removed. 891 892 Examples 893 -------- 894 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 895 'Hello, World!' 896 897 """ 898 import re 899 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
Remove ANSI escape characters from a string.
Parameters
- s (str:): The string to be cleaned.
Returns
- A string with the ANSI characters removed.
Examples
>>> remove_ansi("[1;31mHello, World![0m")
'Hello, World!'
902def get_connector_labels( 903 *types: str, 904 search_term: str = '', 905 ignore_exact_match = True, 906 _additional_options: Optional[List[str]] = None, 907) -> List[str]: 908 """ 909 Read connector labels from the configuration dictionary. 910 911 Parameters 912 ---------- 913 *types: str 914 The connector types. 915 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 916 917 search_term: str, default '' 918 A filter on the connectors' labels. 919 920 ignore_exact_match: bool, default True 921 If `True`, skip a connector if the search_term is an exact match. 922 923 Returns 924 ------- 925 A list of the keys of defined connectors. 926 927 """ 928 from meerschaum.config import get_config 929 connectors = get_config('meerschaum', 'connectors') 930 931 _types = list(types) 932 if len(_types) == 0: 933 _types = list(connectors.keys()) + ['plugin'] 934 935 conns = [] 936 for t in _types: 937 if t == 'plugin': 938 from meerschaum.plugins import get_data_plugins 939 conns += [ 940 f'{t}:' + plugin.module.__name__.split('.')[-1] 941 for plugin in get_data_plugins() 942 ] 943 continue 944 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 945 946 if _additional_options: 947 conns += _additional_options 948 949 possibilities = [ 950 c 951 for c in conns 952 if c.startswith(search_term) 953 and c != ( 954 search_term if ignore_exact_match else '' 955 ) 956 ] 957 return sorted(possibilities)
Read connector labels from the configuration dictionary.
Parameters
- *types (str):
The connector types.
If none are provided, use the defined types (
'sql'
and'api'
) and'plugin'
. - search_term (str, default ''): A filter on the connectors' labels.
- ignore_exact_match (bool, default True):
If
True
, skip a connector if the search_term is an exact match.
Returns
- A list of the keys of defined connectors.
960def json_serialize_datetime(dt: datetime) -> Union[str, None]: 961 """ 962 Serialize a datetime object into JSON (ISO format string). 963 964 Examples 965 -------- 966 >>> import json 967 >>> from datetime import datetime 968 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 969 '{"a": "2022-01-01T00:00:00Z"}' 970 971 """ 972 if not isinstance(dt, datetime): 973 return None 974 tz_suffix = 'Z' if dt.tzinfo is None else '' 975 return dt.isoformat() + tz_suffix
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'
978def wget( 979 url: str, 980 dest: Optional[Union[str, 'pathlib.Path']] = None, 981 headers: Optional[Dict[str, Any]] = None, 982 color: bool = True, 983 debug: bool = False, 984 **kw: Any 985) -> 'pathlib.Path': 986 """ 987 Mimic `wget` with `requests`. 988 989 Parameters 990 ---------- 991 url: str 992 The URL to the resource to be downloaded. 993 994 dest: Optional[Union[str, pathlib.Path]], default None 995 The destination path of the downloaded file. 996 If `None`, save to the current directory. 997 998 color: bool, default True 999 If `debug` is `True`, print color output. 1000 1001 debug: bool, default False 1002 Verbosity toggle. 1003 1004 Returns 1005 ------- 1006 The path to the downloaded file. 1007 1008 """ 1009 from meerschaum.utils.warnings import warn, error 1010 from meerschaum.utils.debug import dprint 1011 import os, pathlib, re, urllib.request 1012 if headers is None: 1013 headers = {} 1014 request = urllib.request.Request(url, headers=headers) 1015 if not color: 1016 dprint = print 1017 if debug: 1018 dprint(f"Downloading from '{url}'...") 1019 try: 1020 response = urllib.request.urlopen(request) 1021 except Exception as e: 1022 import ssl 1023 ssl._create_default_https_context = ssl._create_unverified_context 1024 try: 1025 response = urllib.request.urlopen(request) 1026 except Exception as _e: 1027 print(_e) 1028 response = None 1029 if response is None or response.code != 200: 1030 error_msg = f"Failed to download from '{url}'." 1031 if color: 1032 error(error_msg) 1033 else: 1034 print(error_msg) 1035 import sys 1036 sys.exit(1) 1037 1038 d = response.headers.get('content-disposition', None) 1039 fname = ( 1040 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1041 else url.split('/')[-1] 1042 ) 1043 1044 if dest is None: 1045 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1046 elif isinstance(dest, str): 1047 dest = pathlib.Path(dest) 1048 1049 with open(dest, 'wb') as f: 1050 f.write(response.fp.read()) 1051 1052 if debug: 1053 dprint(f"Downloaded file '{dest}'.") 1054 1055 return dest
Mimic wget
with requests
.
Parameters
- url (str): The URL to the resource to be downloaded.
- dest (Optional[Union[str, pathlib.Path]], default None):
The destination path of the downloaded file.
If
None
, save to the current directory. - color (bool, default True):
If
debug
isTrue
, print color output. - debug (bool, default False): Verbosity toggle.
Returns
- The path to the downloaded file.
1058def async_wrap(func): 1059 """ 1060 Run a synchronous function as async. 1061 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1062 """ 1063 import asyncio 1064 from functools import wraps, partial 1065 1066 @wraps(func) 1067 async def run(*args, loop=None, executor=None, **kwargs): 1068 if loop is None: 1069 loop = asyncio.get_event_loop() 1070 pfunc = partial(func, *args, **kwargs) 1071 return await loop.run_in_executor(executor, pfunc) 1072 return run
Run a synchronous function as async. https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1075def debug_trace(browser: bool = True): 1076 """ 1077 Open a web-based debugger to trace the execution of the program. 1078 1079 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1080 """ 1081 from meerschaum.utils.debug import trace 1082 trace(browser=browser)
Open a web-based debugger to trace the execution of the program.
This is an alias import for meerschaum.utils.debug.debug_trace
.
1085def items_str( 1086 items: List[Any], 1087 quotes: bool = True, 1088 quote_str: str = "'", 1089 commas: bool = True, 1090 comma_str: str = ',', 1091 and_: bool = True, 1092 and_str: str = 'and', 1093 oxford_comma: bool = True, 1094 spaces: bool = True, 1095 space_str = ' ', 1096) -> str: 1097 """ 1098 Return a formatted string if list items separated by commas. 1099 1100 Parameters 1101 ---------- 1102 items: [List[Any]] 1103 The items to be printed as an English list. 1104 1105 quotes: bool, default True 1106 If `True`, wrap items in quotes. 1107 1108 quote_str: str, default "'" 1109 If `quotes` is `True`, prepend and append each item with this string. 1110 1111 and_: bool, default True 1112 If `True`, include the word 'and' before the final item in the list. 1113 1114 and_str: str, default 'and' 1115 If `and_` is True, insert this string where 'and' normally would in and English list. 1116 1117 oxford_comma: bool, default True 1118 If `True`, include the Oxford Comma (comma before the final 'and'). 1119 Only applies when `and_` is `True`. 1120 1121 spaces: bool, default True 1122 If `True`, separate items with `space_str` 1123 1124 space_str: str, default ' ' 1125 If `spaces` is `True`, separate items with this string. 1126 1127 Returns 1128 ------- 1129 A string of the items as an English list. 1130 1131 Examples 1132 -------- 1133 >>> items_str([1,2,3]) 1134 "'1', '2', and '3'" 1135 >>> items_str([1,2,3], quotes=False) 1136 '1, 2, and 3' 1137 >>> items_str([1,2,3], and_=False) 1138 "'1', '2', '3'" 1139 >>> items_str([1,2,3], spaces=False, and_=False) 1140 "'1','2','3'" 1141 >>> items_str([1,2,3], oxford_comma=False) 1142 "'1', '2' and '3'" 1143 >>> items_str([1,2,3], quote_str=":") 1144 ':1:, :2:, and :3:' 1145 >>> items_str([1,2,3], and_str="or") 1146 "'1', '2', or '3'" 1147 >>> items_str([1,2,3], space_str="_") 1148 "'1',_'2',_and_'3'" 1149 1150 """ 1151 if not items: 1152 return '' 1153 1154 q = quote_str if quotes else '' 1155 s = space_str if spaces else '' 1156 a = and_str if and_ else '' 1157 c = comma_str if commas else '' 1158 1159 if len(items) == 1: 1160 return q + str(list(items)[0]) + q 1161 1162 if len(items) == 2: 1163 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1164 1165 sep = q + c + s + q 1166 output = q + sep.join(str(i) for i in items[:-1]) + q 1167 if oxford_comma: 1168 output += c 1169 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1170 return output
Return a formatted string if list items separated by commas.
Parameters
- items ([List[Any]]): The items to be printed as an English list.
- quotes (bool, default True):
If
True
, wrap items in quotes. - quote_str (str, default "'"):
If
quotes
isTrue
, prepend and append each item with this string. - and_ (bool, default True):
If
True
, include the word 'and' before the final item in the list. - and_str (str, default 'and'):
If
and_
is True, insert this string where 'and' normally would in and English list. - oxford_comma (bool, default True):
If
True
, include the Oxford Comma (comma before the final 'and'). Only applies whenand_
isTrue
. - spaces (bool, default True):
If
True
, separate items withspace_str
- space_str (str, default ' '):
If
spaces
isTrue
, separate items with this string.
Returns
- A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
1173def interval_str(delta: Union[timedelta, int]) -> str: 1174 """ 1175 Return a human-readable string for a `timedelta` (or `int` minutes). 1176 1177 Parameters 1178 ---------- 1179 delta: Union[timedelta, int] 1180 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1181 1182 Returns 1183 ------- 1184 A formatted string, fit for human eyes. 1185 """ 1186 from meerschaum.utils.packages import attempt_import 1187 humanfriendly = attempt_import('humanfriendly') 1188 delta_seconds = ( 1189 delta.total_seconds() 1190 if isinstance(delta, timedelta) 1191 else (delta * 60) 1192 ) 1193 return humanfriendly.format_timespan(delta_seconds)
Return a human-readable string for a timedelta
(or int
minutes).
Parameters
- delta (Union[timedelta, int]):
The interval to print. If
delta
is an integer, assume it corresponds to minutes.
Returns
- A formatted string, fit for human eyes.
1196def is_docker_available() -> bool: 1197 """Check if we can connect to the Docker engine.""" 1198 import subprocess 1199 try: 1200 has_docker = subprocess.call( 1201 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1202 ) == 0 1203 except Exception as e: 1204 has_docker = False 1205 return has_docker
Check if we can connect to the Docker engine.
1208def is_android() -> bool: 1209 """Return `True` if the current platform is Android.""" 1210 import sys 1211 return hasattr(sys, 'getandroidapilevel')
Return True
if the current platform is Android.
1214def is_bcp_available() -> bool: 1215 """Check if the MSSQL `bcp` utility is installed.""" 1216 import subprocess 1217 1218 try: 1219 has_bcp = subprocess.call( 1220 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1221 ) == 0 1222 except Exception as e: 1223 has_bcp = False 1224 return has_bcp
Check if the MSSQL bcp
utility is installed.
1227def is_systemd_available() -> bool: 1228 """Check if running on systemd.""" 1229 import subprocess 1230 try: 1231 has_systemctl = subprocess.call( 1232 ['systemctl', '-h'], 1233 stdout=subprocess.DEVNULL, 1234 stderr=subprocess.STDOUT, 1235 ) == 0 1236 except Exception: 1237 has_systemctl = False 1238 return has_systemctl
Check if running on systemd.
1240def get_last_n_lines(file_name: str, N: int): 1241 """ 1242 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1243 """ 1244 import os 1245 # Create an empty list to keep the track of last N lines 1246 list_of_lines = [] 1247 # Open file for reading in binary mode 1248 with open(file_name, 'rb') as read_obj: 1249 # Move the cursor to the end of the file 1250 read_obj.seek(0, os.SEEK_END) 1251 # Create a buffer to keep the last read line 1252 buffer = bytearray() 1253 # Get the current position of pointer i.e eof 1254 pointer_location = read_obj.tell() 1255 # Loop till pointer reaches the top of the file 1256 while pointer_location >= 0: 1257 # Move the file pointer to the location pointed by pointer_location 1258 read_obj.seek(pointer_location) 1259 # Shift pointer location by -1 1260 pointer_location = pointer_location -1 1261 # read that byte / character 1262 new_byte = read_obj.read(1) 1263 # If the read byte is new line character then it means one line is read 1264 if new_byte == b'\n': 1265 # Save the line in list of lines 1266 list_of_lines.append(buffer.decode()[::-1]) 1267 # If the size of list reaches N, then return the reversed list 1268 if len(list_of_lines) == N: 1269 return list(reversed(list_of_lines)) 1270 # Reinitialize the byte array to save next line 1271 buffer = bytearray() 1272 else: 1273 # If last read character is not eol then add it in buffer 1274 buffer.extend(new_byte) 1275 # As file is read completely, if there is still data in buffer, then its first line. 1276 if len(buffer) > 0: 1277 list_of_lines.append(buffer.decode()[::-1]) 1278 # return the reversed list 1279 return list(reversed(list_of_lines))
1282def tail(f, n, offset=None): 1283 """ 1284 https://stackoverflow.com/a/692616/9699829 1285 1286 Reads n lines from f with an offset of offset lines. The return 1287 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1288 an indicator that is `True` if there are more lines in the file. 1289 """ 1290 avg_line_length = 74 1291 to_read = n + (offset or 0) 1292 1293 while True: 1294 try: 1295 f.seek(-(avg_line_length * to_read), 2) 1296 except IOError: 1297 # woops. apparently file is smaller than what we want 1298 # to step back, go to the beginning instead 1299 f.seek(0) 1300 pos = f.tell() 1301 lines = f.read().splitlines() 1302 if len(lines) >= to_read or pos == 0: 1303 return lines[-to_read:offset and -offset or None], \ 1304 len(lines) > to_read or pos > 0 1305 avg_line_length *= 1.3
https://stackoverflow.com/a/692616/9699829
Reads n lines from f with an offset of offset lines. The return
value is a tuple in the form (lines, has_more)
where has_more
is
an indicator that is True
if there are more lines in the file.
1308def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1309 """ 1310 Remove characters from each section of a string until the length is within the limit. 1311 1312 Parameters 1313 ---------- 1314 item: str 1315 The item name to be truncated. 1316 1317 delimeter: str, default '_' 1318 Split `item` by this string into several sections. 1319 1320 max_len: int, default 128 1321 The max acceptable length of the truncated version of `item`. 1322 1323 Returns 1324 ------- 1325 The truncated string. 1326 1327 Examples 1328 -------- 1329 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1330 'ab_de_gh' 1331 1332 """ 1333 if len(item) < max_len: 1334 return item 1335 1336 def _shorten(s: str) -> str: 1337 return s[:-1] if len(s) > 1 else s 1338 1339 sections = list(enumerate(item.split('_'))) 1340 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1341 available_chars = max_len - len(sections) 1342 1343 _sections = [(i, s) for i, s in sorted_sections] 1344 _sections_len = sum([len(s) for i, s in _sections]) 1345 _old_sections_len = _sections_len 1346 while _sections_len > available_chars: 1347 _sections = [(i, _shorten(s)) for i, s in _sections] 1348 _old_sections_len = _sections_len 1349 _sections_len = sum([len(s) for i, s in _sections]) 1350 if _old_sections_len == _sections_len: 1351 raise Exception(f"String could not be truncated: '{item}'") 1352 1353 new_sections = sorted(_sections, key=lambda x: x[0]) 1354 return delimeter.join([s for i, s in new_sections])
Remove characters from each section of a string until the length is within the limit.
Parameters
- item (str): The item name to be truncated.
- delimeter (str, default '_'):
Split
item
by this string into several sections. - max_len (int, default 128):
The max acceptable length of the truncated version of
item
.
Returns
- The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
1357def separate_negation_values( 1358 vals: Union[List[str], Tuple[str]], 1359 negation_prefix: Optional[str] = None, 1360) -> Tuple[List[str], List[str]]: 1361 """ 1362 Separate the negated values from the positive ones. 1363 Return two lists: positive and negative values. 1364 1365 Parameters 1366 ---------- 1367 vals: Union[List[str], Tuple[str]] 1368 A list of strings to parse. 1369 1370 negation_prefix: Optional[str], default None 1371 Include values that start with this string in the second list. 1372 If `None`, use the system default (`_`). 1373 """ 1374 if negation_prefix is None: 1375 from meerschaum.config.static import STATIC_CONFIG 1376 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1377 _in_vals, _ex_vals = [], [] 1378 for v in vals: 1379 if str(v).startswith(negation_prefix): 1380 _ex_vals.append(str(v)[len(negation_prefix):]) 1381 else: 1382 _in_vals.append(v) 1383 1384 return _in_vals, _ex_vals
Separate the negated values from the positive ones. Return two lists: positive and negative values.
Parameters
- vals (Union[List[str], Tuple[str]]): A list of strings to parse.
- negation_prefix (Optional[str], default None):
Include values that start with this string in the second list.
If
None
, use the system default (_
).
1387def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1388 """ 1389 Translate a params dictionary into lists of include- and exclude-values. 1390 1391 Parameters 1392 ---------- 1393 params: Optional[Dict[str, Any]] 1394 A params query dictionary. 1395 1396 Returns 1397 ------- 1398 A dictionary mapping keys to a tuple of lists for include and exclude values. 1399 1400 Examples 1401 -------- 1402 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1403 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1404 """ 1405 if not params: 1406 return {} 1407 return { 1408 col: separate_negation_values( 1409 ( 1410 val 1411 if isinstance(val, (list, tuple)) 1412 else [val] 1413 ) 1414 ) 1415 for col, val in params.items() 1416 }
Translate a params dictionary into lists of include- and exclude-values.
Parameters
- params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
- A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
1419def flatten_list(list_: List[Any]) -> List[Any]: 1420 """ 1421 Recursively flatten a list. 1422 """ 1423 for item in list_: 1424 if isinstance(item, list): 1425 yield from flatten_list(item) 1426 else: 1427 yield item
Recursively flatten a list.
1430def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1431 """ 1432 Parse a string containing the text to be passed into a function 1433 and return a tuple of args, kwargs. 1434 1435 Parameters 1436 ---------- 1437 args_str: str 1438 The contents of the function parameter (as a string). 1439 1440 Returns 1441 ------- 1442 A tuple of args (tuple) and kwargs (dict[str, Any]). 1443 1444 Examples 1445 -------- 1446 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1447 (123, 456), {'foo': 789, 'bar': 'baz'} 1448 """ 1449 import ast 1450 args = [] 1451 kwargs = {} 1452 1453 for part in args_str.split(','): 1454 if '=' in part: 1455 key, val = part.split('=', 1) 1456 kwargs[key.strip()] = ast.literal_eval(val) 1457 else: 1458 args.append(ast.literal_eval(part.strip())) 1459 1460 return tuple(args), kwargs
Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.
Parameters
- args_str (str): The contents of the function parameter (as a string).
Returns
- A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
1463def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1464 """ 1465 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1466 1467 Parameters 1468 ---------- 1469 src_path: pathlib.Path 1470 The source path. 1471 1472 dest_path: pathlib.Path 1473 The destination path. 1474 1475 Returns 1476 ------- 1477 A SuccessTuple indicating success. 1478 """ 1479 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1480 return True, "Symlink already exists." 1481 try: 1482 dest_path.symlink_to(src_path) 1483 success = True 1484 except Exception as e: 1485 success = False 1486 msg = str(e) 1487 if success: 1488 return success, "Success" 1489 1490 ### Failed to create a symlink. 1491 ### If we're not on Windows, return an error. 1492 import platform 1493 if platform.system() != 'Windows': 1494 return success, msg 1495 1496 try: 1497 import _winapi 1498 except ImportError: 1499 return False, "Unable to import _winapi." 1500 1501 if src_path.is_dir(): 1502 try: 1503 _winapi.CreateJunction(str(src_path), str(dest_path)) 1504 except Exception as e: 1505 return False, str(e) 1506 return True, "Success" 1507 1508 ### Last resort: copy the file on Windows. 1509 import shutil 1510 try: 1511 shutil.copy(src_path, dest_path) 1512 except Exception as e: 1513 return False, str(e) 1514 1515 return True, "Success"
Wrap around pathlib.Path.symlink_to
, but add support for Windows.
Parameters
- src_path (pathlib.Path): The source path.
- dest_path (pathlib.Path): The destination path.
Returns
- A SuccessTuple indicating success.
1518def is_symlink(path: pathlib.Path) -> bool: 1519 """ 1520 Wrap `path.is_symlink()` but add support for Windows junctions. 1521 """ 1522 if path.is_symlink(): 1523 return True 1524 import platform, os 1525 if platform.system() != 'Windows': 1526 return False 1527 try: 1528 return bool(os.readlink(path)) 1529 except OSError: 1530 return False
Wrap path.is_symlink()
but add support for Windows junctions.
1533def parametrized(dec): 1534 """ 1535 A meta-decorator for allowing other decorator functions to have parameters. 1536 1537 https://stackoverflow.com/a/26151604/9699829 1538 """ 1539 def layer(*args, **kwargs): 1540 def repl(f): 1541 return dec(f, *args, **kwargs) 1542 return repl 1543 return layer
A meta-decorator for allowing other decorator functions to have parameters.
1546def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1547 """ 1548 Safely extract a TAR file to a give directory. 1549 This defends against CVE-2007-4559. 1550 1551 Parameters 1552 ---------- 1553 tarf: file 1554 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1555 1556 output_dir: Union[str, pathlib.Path] 1557 The output directory. 1558 """ 1559 import os 1560 1561 def is_within_directory(directory, target): 1562 abs_directory = os.path.abspath(directory) 1563 abs_target = os.path.abspath(target) 1564 prefix = os.path.commonprefix([abs_directory, abs_target]) 1565 return prefix == abs_directory 1566 1567 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1568 for member in tar.getmembers(): 1569 member_path = os.path.join(path, member.name) 1570 if not is_within_directory(path, member_path): 1571 raise Exception("Attempted Path Traversal in Tar File") 1572 1573 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1574 1575 return safe_extract(tarf, output_dir)
Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.
Parameters
- tarf (file):
The TAR file opened with
tarfile.open(path, 'r:gz')
. - output_dir (Union[str, pathlib.Path]): The output directory.
1582def choose_subaction(*args, **kwargs) -> Any: 1583 """ 1584 Placeholder function to prevent breaking legacy behavior. 1585 See `meerschaum.actions.choose_subaction`. 1586 """ 1587 from meerschaum.actions import choose_subaction as _choose_subactions 1588 return _choose_subactions(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.actions.choose_subaction
.
1591def print_options(*args, **kwargs) -> None: 1592 """ 1593 Placeholder function to prevent breaking legacy behavior. 1594 See `meerschaum.utils.formatting.print_options`. 1595 """ 1596 from meerschaum.utils.formatting import print_options as _print_options 1597 return _print_options(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.utils.formatting.print_options
.