meerschaum.utils.misc
Miscellaneous functions go here
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4""" 5Miscellaneous functions go here 6""" 7 8from __future__ import annotations 9import sys 10from datetime import timedelta, datetime, timezone 11from meerschaum.utils.typing import ( 12 Union, 13 Any, 14 Callable, 15 Optional, 16 List, 17 Dict, 18 SuccessTuple, 19 Iterable, 20 PipesDict, 21 Tuple, 22 InstanceConnector, 23 Hashable, 24 Generator, 25 Iterator, 26 TYPE_CHECKING, 27) 28import meerschaum as mrsm 29if TYPE_CHECKING: 30 import collections 31 32__pdoc__: Dict[str, bool] = { 33 'to_pandas_dtype': False, 34 'filter_unseen_df': False, 35 'add_missing_cols_to_df': False, 36 'parse_df_datetimes': False, 37 'df_from_literal': False, 38 'get_json_cols': False, 39 'get_unhashable_cols': False, 40 'enforce_dtypes': False, 41 'get_datetime_bound_from_df': False, 42 'df_is_chunk_generator': False, 43 'choices_docstring': False, 44 '_get_subaction_names': False, 45} 46 47 48def add_method_to_class( 49 func: Callable[[Any], Any], 50 class_def: 'Class', 51 method_name: Optional[str] = None, 52 keep_self: Optional[bool] = None, 53 ) -> Callable[[Any], Any]: 54 """ 55 Add function `func` to class `class_def`. 56 57 Parameters 58 ---------- 59 func: Callable[[Any], Any] 60 Function to be added as a method of the class 61 62 class_def: Class 63 Class to be modified. 64 65 method_name: Optional[str], default None 66 New name of the method. None will use func.__name__ (default). 67 68 Returns 69 ------- 70 The modified function object. 71 72 """ 73 from functools import wraps 74 75 is_class = isinstance(class_def, type) 76 77 @wraps(func) 78 def wrapper(self, *args, **kw): 79 return func(*args, **kw) 80 81 if method_name is None: 82 method_name = func.__name__ 83 84 setattr(class_def, method_name, ( 85 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 86 ) 87 ) 88 89 return func 90 91 92def generate_password(length: int = 12) -> str: 93 """ 94 Generate a secure password of given length. 95 96 Parameters 97 ---------- 98 length: int, default 12 99 The length of the password. 100 101 Returns 102 ------- 103 A random password string. 104 """ 105 import secrets 106 import string 107 return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length))) 108 109 110def is_int(s: str) -> bool: 111 """ 112 Check if string is an int. 113 114 Parameters 115 ---------- 116 s: str 117 The string to be checked. 118 119 Returns 120 ------- 121 A bool indicating whether the string was able to be cast to an integer. 122 123 """ 124 try: 125 float(s) 126 except Exception: 127 return False 128 129 return float(s).is_integer() 130 131 132def string_to_dict( 133 params_string: str 134) -> Dict[str, Any]: 135 """ 136 Parse a string into a dictionary. 137 138 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 139 140 Parameters 141 ---------- 142 params_string: str 143 The string to be parsed. 144 145 Returns 146 ------- 147 The parsed dictionary. 148 149 Examples 150 -------- 151 >>> string_to_dict("a:1,b:2") 152 {'a': 1, 'b': 2} 153 >>> string_to_dict('{"a": 1, "b": 2}') 154 {'a': 1, 'b': 2} 155 156 """ 157 if params_string == "": 158 return {} 159 160 import json 161 162 ### Kind of a weird edge case. 163 ### In the generated compose file, there is some weird escaping happening, 164 ### so the string to be parsed starts and ends with a single quote. 165 if ( 166 isinstance(params_string, str) 167 and len(params_string) > 4 168 and params_string[1] == "{" 169 and params_string[-2] == "}" 170 ): 171 return json.loads(params_string[1:-1]) 172 if str(params_string).startswith('{'): 173 return json.loads(params_string) 174 175 import ast 176 params_dict = {} 177 for param in params_string.split(","): 178 _keys = param.split(":") 179 keys = _keys[:-1] 180 try: 181 val = ast.literal_eval(_keys[-1]) 182 except Exception: 183 val = str(_keys[-1]) 184 185 c = params_dict 186 for _k in keys[:-1]: 187 try: 188 k = ast.literal_eval(_k) 189 except Exception: 190 k = str(_k) 191 if k not in c: 192 c[k] = {} 193 c = c[k] 194 195 c[keys[-1]] = val 196 197 return params_dict 198 199 200def parse_config_substitution( 201 value: str, 202 leading_key: str = 'MRSM', 203 begin_key: str = '{', 204 end_key: str = '}', 205 delimeter: str = ':' 206) -> List[Any]: 207 """ 208 Parse Meerschaum substitution syntax 209 E.g. MRSM{value1:value2} => ['value1', 'value2'] 210 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 211 """ 212 if not value.beginswith(leading_key): 213 return value 214 215 return leading_key[len(leading_key):][len():-1].split(delimeter) 216 217 218def edit_file( 219 path: Union['pathlib.Path', str], 220 default_editor: str = 'pyvim', 221 debug: bool = False 222) -> bool: 223 """ 224 Open a file for editing. 225 226 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 227 228 Parameters 229 ---------- 230 path: Union[pathlib.Path, str] 231 The path to the file to be edited. 232 233 default_editor: str, default 'pyvim' 234 If `$EDITOR` is not set, use this instead. 235 If `pyvim` is not installed, it will install it from PyPI. 236 237 debug: bool, default False 238 Verbosity toggle. 239 240 Returns 241 ------- 242 A bool indicating the file was successfully edited. 243 """ 244 import os 245 from subprocess import call 246 from meerschaum.utils.debug import dprint 247 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 248 try: 249 EDITOR = os.environ.get('EDITOR', default_editor) 250 if debug: 251 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 252 rc = call([EDITOR, path]) 253 except Exception as e: ### can't open with default editors 254 if debug: 255 dprint(str(e)) 256 dprint('Failed to open file with system editor. Falling back to pyvim...') 257 pyvim = attempt_import('pyvim', lazy=False) 258 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 259 return rc == 0 260 261 262def is_pipe_registered( 263 pipe: mrsm.Pipe, 264 pipes: PipesDict, 265 debug: bool = False 266) -> bool: 267 """ 268 Check if a Pipe is inside the pipes dictionary. 269 270 Parameters 271 ---------- 272 pipe: meerschaum.Pipe 273 The pipe to see if it's in the dictionary. 274 275 pipes: PipesDict 276 The dictionary to search inside. 277 278 debug: bool, default False 279 Verbosity toggle. 280 281 Returns 282 ------- 283 A bool indicating whether the pipe is inside the dictionary. 284 """ 285 from meerschaum.utils.debug import dprint 286 ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key 287 if debug: 288 dprint(f'{ck}, {mk}, {lk}') 289 dprint(f'{pipe}, {pipes}') 290 return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk] 291 292 293def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 294 """ 295 Determine the columns and lines in the terminal. 296 If they cannot be determined, return the default values (100 columns and 120 lines). 297 298 Parameters 299 ---------- 300 default_cols: int, default 100 301 If the columns cannot be determined, return this value. 302 303 default_lines: int, default 120 304 If the lines cannot be determined, return this value. 305 306 Returns 307 ------- 308 A tuple if integers for the columns and lines. 309 """ 310 import os 311 try: 312 size = os.get_terminal_size() 313 _cols, _lines = size.columns, size.lines 314 except Exception as e: 315 _cols, _lines = ( 316 int(os.environ.get('COLUMNS', str(default_cols))), 317 int(os.environ.get('LINES', str(default_lines))), 318 ) 319 return _cols, _lines 320 321 322def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 323 """ 324 Iterate over a list in chunks. 325 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 326 327 Parameters 328 ---------- 329 iterable: Iterable[Any] 330 The iterable to iterate over in chunks. 331 332 chunksize: int 333 The size of chunks to iterate with. 334 335 fillvalue: Optional[Any], default None 336 If the chunks do not evenly divide into the iterable, pad the end with this value. 337 338 Returns 339 ------- 340 A generator of tuples of size `chunksize`. 341 342 """ 343 from itertools import zip_longest 344 args = [iter(iterable)] * chunksize 345 return zip_longest(*args, fillvalue=fillvalue) 346 347def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 348 """ 349 Sort a dictionary's values and return a new dictionary. 350 351 Parameters 352 ---------- 353 d: Dict[Any, Any] 354 The dictionary to be sorted. 355 356 Returns 357 ------- 358 A sorted dictionary. 359 360 Examples 361 -------- 362 >>> sorted_dict({'b': 1, 'a': 2}) 363 {'b': 1, 'a': 2} 364 >>> sorted_dict({'b': 2, 'a': 1}) 365 {'a': 1, 'b': 2} 366 367 """ 368 try: 369 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 370 except Exception as e: 371 return d 372 373def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 374 """ 375 Convert the standard pipes dictionary into a list. 376 377 Parameters 378 ---------- 379 pipes_dict: PipesDict 380 The pipes dictionary to be flattened. 381 382 Returns 383 ------- 384 A list of `Pipe` objects. 385 386 """ 387 pipes_list = [] 388 for ck in pipes_dict.values(): 389 for mk in ck.values(): 390 pipes_list += list(mk.values()) 391 return pipes_list 392 393 394def round_time( 395 dt: Optional[datetime] = None, 396 date_delta: Optional[timedelta] = None, 397 to: 'str' = 'down' 398) -> datetime: 399 """ 400 Round a datetime object to a multiple of a timedelta. 401 http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python 402 403 NOTE: This function strips timezone information! 404 405 Parameters 406 ---------- 407 dt: Optional[datetime], default None 408 If `None`, grab the current UTC datetime. 409 410 date_delta: Optional[timedelta], default None 411 If `None`, use a delta of 1 minute. 412 413 to: 'str', default 'down' 414 Available options are `'up'`, `'down'`, and `'closest'`. 415 416 Returns 417 ------- 418 A rounded `datetime` object. 419 420 Examples 421 -------- 422 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 423 datetime.datetime(2022, 1, 1, 12, 15) 424 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 425 datetime.datetime(2022, 1, 1, 12, 16) 426 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 427 datetime.datetime(2022, 1, 1, 12, 0) 428 >>> round_time( 429 ... datetime(2022, 1, 1, 12, 15, 57, 200), 430 ... timedelta(hours=1), 431 ... to = 'closest' 432 ... ) 433 datetime.datetime(2022, 1, 1, 12, 0) 434 >>> round_time( 435 ... datetime(2022, 1, 1, 12, 45, 57, 200), 436 ... datetime.timedelta(hours=1), 437 ... to = 'closest' 438 ... ) 439 datetime.datetime(2022, 1, 1, 13, 0) 440 441 """ 442 if date_delta is None: 443 date_delta = timedelta(minutes=1) 444 round_to = date_delta.total_seconds() 445 if dt is None: 446 dt = datetime.now(timezone.utc).replace(tzinfo=None) 447 seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds 448 449 if seconds % round_to == 0 and dt.microsecond == 0: 450 rounding = (seconds + round_to / 2) // round_to * round_to 451 else: 452 if to == 'up': 453 rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to 454 elif to == 'down': 455 rounding = seconds // round_to * round_to 456 else: 457 rounding = (seconds + round_to / 2) // round_to * round_to 458 459 return dt + timedelta(0, rounding - seconds, - dt.microsecond) 460 461 462def timed_input( 463 seconds: int = 10, 464 timeout_message: str = "", 465 prompt: str = "", 466 icon: bool = False, 467 **kw 468 ) -> Union[str, None]: 469 """ 470 Accept user input only for a brief period of time. 471 472 Parameters 473 ---------- 474 seconds: int, default 10 475 The number of seconds to wait. 476 477 timeout_message: str, default '' 478 The message to print after the window has elapsed. 479 480 prompt: str, default '' 481 The prompt to print during the window. 482 483 icon: bool, default False 484 If `True`, print the configured input icon. 485 486 487 Returns 488 ------- 489 The input string entered by the user. 490 491 """ 492 import signal, time 493 494 class TimeoutExpired(Exception): 495 """Raise this exception when the timeout is reached.""" 496 497 def alarm_handler(signum, frame): 498 raise TimeoutExpired 499 500 # set signal handler 501 signal.signal(signal.SIGALRM, alarm_handler) 502 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 503 504 try: 505 return input(prompt) 506 except TimeoutExpired: 507 return None 508 except (EOFError, RuntimeError): 509 try: 510 print(prompt) 511 time.sleep(seconds) 512 except TimeoutExpired: 513 return None 514 finally: 515 signal.alarm(0) # cancel alarm 516 517 518 519 520 521def replace_pipes_in_dict( 522 pipes : Optional[PipesDict] = None, 523 func: 'function' = str, 524 debug: bool = False, 525 **kw 526 ) -> PipesDict: 527 """ 528 Replace the Pipes in a Pipes dict with the result of another function. 529 530 Parameters 531 ---------- 532 pipes: Optional[PipesDict], default None 533 The pipes dict to be processed. 534 535 func: Callable[[Any], Any], default str 536 The function to be applied to every pipe. 537 Defaults to the string constructor. 538 539 debug: bool, default False 540 Verbosity toggle. 541 542 543 Returns 544 ------- 545 A dictionary where every pipe is replaced with the output of a function. 546 547 """ 548 import copy 549 def change_dict(d : Dict[Any, Any], func : 'function') -> None: 550 for k, v in d.items(): 551 if isinstance(v, dict): 552 change_dict(v, func) 553 else: 554 d[k] = func(v) 555 556 if pipes is None: 557 from meerschaum import get_pipes 558 pipes = get_pipes(debug=debug, **kw) 559 560 result = copy.deepcopy(pipes) 561 change_dict(result, func) 562 return result 563 564def enforce_gevent_monkey_patch(): 565 """ 566 Check if gevent monkey patching is enabled, and if not, then apply patching. 567 """ 568 from meerschaum.utils.packages import attempt_import 569 import socket 570 gevent, gevent_socket, gevent_monkey = attempt_import( 571 'gevent', 'gevent.socket', 'gevent.monkey' 572 ) 573 if not socket.socket is gevent_socket.socket: 574 gevent_monkey.patch_all() 575 576def is_valid_email(email: str) -> Union['re.Match', None]: 577 """ 578 Check whether a string is a valid email. 579 580 Parameters 581 ---------- 582 email: str 583 The string to be examined. 584 585 Returns 586 ------- 587 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 588 589 Examples 590 -------- 591 >>> is_valid_email('foo') 592 >>> is_valid_email('foo@foo.com') 593 <re.Match object; span=(0, 11), match='foo@foo.com'> 594 595 """ 596 import re 597 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 598 return re.search(regex, email) 599 600 601def string_width(string: str, widest: bool = True) -> int: 602 """ 603 Calculate the width of a string, either by its widest or last line. 604 605 Parameters 606 ---------- 607 string: str: 608 The string to be examined. 609 610 widest: bool, default True 611 No longer used because `widest` is always assumed to be true. 612 613 Returns 614 ------- 615 An integer for the text's visual width. 616 617 Examples 618 -------- 619 >>> string_width('a') 620 1 621 >>> string_width('a\\nbc\\nd') 622 2 623 624 """ 625 def _widest(): 626 words = string.split('\n') 627 max_length = 0 628 for w in words: 629 length = len(w) 630 if length > max_length: 631 max_length = length 632 return max_length 633 634 return _widest() 635 636def _pyinstaller_traverse_dir( 637 directory: str, 638 ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), 639 include_dotfiles: bool = False 640 ) -> list: 641 """ 642 Recursively traverse a directory and return a list of its contents. 643 """ 644 import os, pathlib 645 paths = [] 646 _directory = pathlib.Path(directory) 647 648 def _found_pattern(name: str): 649 for pattern in ignore_patterns: 650 if pattern.replace('/', os.path.sep) in str(name): 651 return True 652 return False 653 654 for root, dirs, files in os.walk(_directory): 655 _root = str(root)[len(str(_directory.parent)):] 656 if _root.startswith(os.path.sep): 657 _root = _root[len(os.path.sep):] 658 if _root.startswith('.') and not include_dotfiles: 659 continue 660 ### ignore certain patterns 661 if _found_pattern(_root): 662 continue 663 664 for filename in files: 665 if filename.startswith('.') and not include_dotfiles: 666 continue 667 path = os.path.join(root, filename) 668 if _found_pattern(path): 669 continue 670 671 _path = str(path)[len(str(_directory.parent)):] 672 if _path.startswith(os.path.sep): 673 _path = _path[len(os.path.sep):] 674 _path = os.path.sep.join(_path.split(os.path.sep)[:-1]) 675 676 paths.append((path, _path)) 677 return paths 678 679 680def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 681 """ 682 Recursively replace passwords in a dictionary. 683 684 Parameters 685 ---------- 686 d: Dict[str, Any] 687 The dictionary to search through. 688 689 replace_with: str, default '*' 690 The string to replace each character of the password with. 691 692 Returns 693 ------- 694 Another dictionary where values to the keys `'password'` 695 are replaced with `replace_with` (`'*'`). 696 697 Examples 698 -------- 699 >>> replace_password({'a': 1}) 700 {'a': 1} 701 >>> replace_password({'password': '123'}) 702 {'password': '***'} 703 >>> replace_password({'nested': {'password': '123'}}) 704 {'nested': {'password': '***'}} 705 >>> replace_password({'password': '123'}, replace_with='!') 706 {'password': '!!!'} 707 708 """ 709 import copy 710 _d = copy.deepcopy(d) 711 for k, v in d.items(): 712 if isinstance(v, dict): 713 _d[k] = replace_password(v) 714 elif 'password' in str(k).lower(): 715 _d[k] = ''.join([replace_with for char in str(v)]) 716 elif str(k).lower() == 'uri': 717 from meerschaum.connectors.sql import SQLConnector 718 try: 719 uri_params = SQLConnector.parse_uri(v) 720 except Exception as e: 721 uri_params = None 722 if not uri_params: 723 continue 724 if not 'username' in uri_params or not 'password' in uri_params: 725 continue 726 _d[k] = v.replace( 727 uri_params['username'] + ':' + uri_params['password'], 728 uri_params['username'] + ':' + ''.join( 729 [replace_with for char in str(uri_params['password'])] 730 ) 731 ) 732 return _d 733 734 735def filter_arguments( 736 func: Callable[[Any], Any], 737 *args: Any, 738 **kwargs: Any 739) -> Tuple[Tuple[Any], Dict[str, Any]]: 740 """ 741 Filter out unsupported positional and keyword arguments. 742 743 Parameters 744 ---------- 745 func: Callable[[Any], Any] 746 The function to inspect. 747 748 *args: Any 749 Positional arguments to filter and pass to `func`. 750 751 **kwargs 752 Keyword arguments to filter and pass to `func`. 753 754 Returns 755 ------- 756 The `args` and `kwargs` accepted by `func`. 757 """ 758 args = filter_positionals(func, *args) 759 kwargs = filter_keywords(func, **kwargs) 760 return args, kwargs 761 762 763def filter_keywords( 764 func: Callable[[Any], Any], 765 **kw: Any 766) -> Dict[str, Any]: 767 """ 768 Filter out unsupported keyword arguments. 769 770 Parameters 771 ---------- 772 func: Callable[[Any], Any] 773 The function to inspect. 774 775 **kw: Any 776 The arguments to be filtered and passed into `func`. 777 778 Returns 779 ------- 780 A dictionary of keyword arguments accepted by `func`. 781 782 Examples 783 -------- 784 ```python 785 >>> def foo(a=1, b=2): 786 ... return a * b 787 >>> filter_keywords(foo, a=2, b=4, c=6) 788 {'a': 2, 'b': 4} 789 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 790 8 791 ``` 792 793 """ 794 import inspect 795 func_params = inspect.signature(func).parameters 796 ### If the function has a **kw method, skip filtering. 797 for param, _type in func_params.items(): 798 if '**' in str(_type): 799 return kw 800 return {k: v for k, v in kw.items() if k in func_params} 801 802 803def filter_positionals( 804 func: Callable[[Any], Any], 805 *args: Any 806) -> Tuple[Any]: 807 """ 808 Filter out unsupported positional arguments. 809 810 Parameters 811 ---------- 812 func: Callable[[Any], Any] 813 The function to inspect. 814 815 *args: Any 816 The arguments to be filtered and passed into `func`. 817 NOTE: If the function signature expects more arguments than provided, 818 the missing slots will be filled with `None`. 819 820 Returns 821 ------- 822 A tuple of positional arguments accepted by `func`. 823 824 Examples 825 -------- 826 ```python 827 >>> def foo(a, b): 828 ... return a * b 829 >>> filter_positionals(foo, 2, 4, 6) 830 (2, 4) 831 >>> foo(*filter_positionals(foo, 2, 4, 6)) 832 8 833 ``` 834 835 """ 836 import inspect 837 from meerschaum.utils.warnings import warn 838 func_params = inspect.signature(func).parameters 839 acceptable_args: List[Any] = [] 840 841 def _warn_invalids(_num_invalid): 842 if _num_invalid > 0: 843 warn( 844 "Too few arguments were provided. " 845 + f"{_num_invalid} argument" 846 + ('s have ' if _num_invalid != 1 else " has ") 847 + " been filled with `None`.", 848 ) 849 850 num_invalid: int = 0 851 for i, (param, val) in enumerate(func_params.items()): 852 if '=' in str(val) or '*' in str(val): 853 _warn_invalids(num_invalid) 854 return tuple(acceptable_args) 855 856 try: 857 acceptable_args.append(args[i]) 858 except IndexError: 859 acceptable_args.append(None) 860 num_invalid += 1 861 862 _warn_invalids(num_invalid) 863 return tuple(acceptable_args) 864 865 866def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 867 """ 868 Convert an ordered dict to a dict. 869 Does not mutate the original OrderedDict. 870 """ 871 from collections import OrderedDict 872 _d = dict(od) 873 for k, v in od.items(): 874 if isinstance(v, OrderedDict) or ( 875 issubclass(type(v), OrderedDict) 876 ): 877 _d[k] = dict_from_od(v) 878 return _d 879 880def remove_ansi(s: str) -> str: 881 """ 882 Remove ANSI escape characters from a string. 883 884 Parameters 885 ---------- 886 s: str: 887 The string to be cleaned. 888 889 Returns 890 ------- 891 A string with the ANSI characters removed. 892 893 Examples 894 -------- 895 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 896 'Hello, World!' 897 898 """ 899 import re 900 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s) 901 902 903def get_connector_labels( 904 *types: str, 905 search_term: str = '', 906 ignore_exact_match = True, 907 _additional_options: Optional[List[str]] = None, 908) -> List[str]: 909 """ 910 Read connector labels from the configuration dictionary. 911 912 Parameters 913 ---------- 914 *types: str 915 The connector types. 916 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 917 918 search_term: str, default '' 919 A filter on the connectors' labels. 920 921 ignore_exact_match: bool, default True 922 If `True`, skip a connector if the search_term is an exact match. 923 924 Returns 925 ------- 926 A list of the keys of defined connectors. 927 928 """ 929 from meerschaum.config import get_config 930 connectors = get_config('meerschaum', 'connectors') 931 932 _types = list(types) 933 if len(_types) == 0: 934 _types = list(connectors.keys()) + ['plugin'] 935 936 conns = [] 937 for t in _types: 938 if t == 'plugin': 939 from meerschaum.plugins import get_data_plugins 940 conns += [ 941 f'{t}:' + plugin.module.__name__.split('.')[-1] 942 for plugin in get_data_plugins() 943 ] 944 continue 945 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 946 947 if _additional_options: 948 conns += _additional_options 949 950 possibilities = [ 951 c 952 for c in conns 953 if c.startswith(search_term) 954 and c != ( 955 search_term if ignore_exact_match else '' 956 ) 957 ] 958 return sorted(possibilities) 959 960 961def wget( 962 url: str, 963 dest: Optional[Union[str, 'pathlib.Path']] = None, 964 headers: Optional[Dict[str, Any]] = None, 965 color: bool = True, 966 debug: bool = False, 967 **kw: Any 968) -> 'pathlib.Path': 969 """ 970 Mimic `wget` with `requests`. 971 972 Parameters 973 ---------- 974 url: str 975 The URL to the resource to be downloaded. 976 977 dest: Optional[Union[str, pathlib.Path]], default None 978 The destination path of the downloaded file. 979 If `None`, save to the current directory. 980 981 color: bool, default True 982 If `debug` is `True`, print color output. 983 984 debug: bool, default False 985 Verbosity toggle. 986 987 Returns 988 ------- 989 The path to the downloaded file. 990 991 """ 992 from meerschaum.utils.warnings import warn, error 993 from meerschaum.utils.debug import dprint 994 import os, pathlib, re, urllib.request 995 if headers is None: 996 headers = {} 997 request = urllib.request.Request(url, headers=headers) 998 if not color: 999 dprint = print 1000 if debug: 1001 dprint(f"Downloading from '{url}'...") 1002 try: 1003 response = urllib.request.urlopen(request) 1004 except Exception as e: 1005 import ssl 1006 ssl._create_default_https_context = ssl._create_unverified_context 1007 try: 1008 response = urllib.request.urlopen(request) 1009 except Exception as _e: 1010 print(_e) 1011 response = None 1012 if response is None or response.code != 200: 1013 error_msg = f"Failed to download from '{url}'." 1014 if color: 1015 error(error_msg) 1016 else: 1017 print(error_msg) 1018 import sys 1019 sys.exit(1) 1020 1021 d = response.headers.get('content-disposition', None) 1022 fname = ( 1023 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1024 else url.split('/')[-1] 1025 ) 1026 1027 if dest is None: 1028 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1029 elif isinstance(dest, str): 1030 dest = pathlib.Path(dest) 1031 1032 with open(dest, 'wb') as f: 1033 f.write(response.fp.read()) 1034 1035 if debug: 1036 dprint(f"Downloaded file '{dest}'.") 1037 1038 return dest 1039 1040 1041def async_wrap(func): 1042 """ 1043 Run a synchronous function as async. 1044 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1045 """ 1046 import asyncio 1047 from functools import wraps, partial 1048 1049 @wraps(func) 1050 async def run(*args, loop=None, executor=None, **kwargs): 1051 if loop is None: 1052 loop = asyncio.get_event_loop() 1053 pfunc = partial(func, *args, **kwargs) 1054 return await loop.run_in_executor(executor, pfunc) 1055 return run 1056 1057 1058def debug_trace(browser: bool = True): 1059 """ 1060 Open a web-based debugger to trace the execution of the program. 1061 1062 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1063 """ 1064 from meerschaum.utils.debug import trace 1065 trace(browser=browser) 1066 1067 1068def items_str( 1069 items: List[Any], 1070 quotes: bool = True, 1071 quote_str: str = "'", 1072 commas: bool = True, 1073 comma_str: str = ',', 1074 and_: bool = True, 1075 and_str: str = 'and', 1076 oxford_comma: bool = True, 1077 spaces: bool = True, 1078 space_str = ' ', 1079) -> str: 1080 """ 1081 Return a formatted string if list items separated by commas. 1082 1083 Parameters 1084 ---------- 1085 items: [List[Any]] 1086 The items to be printed as an English list. 1087 1088 quotes: bool, default True 1089 If `True`, wrap items in quotes. 1090 1091 quote_str: str, default "'" 1092 If `quotes` is `True`, prepend and append each item with this string. 1093 1094 and_: bool, default True 1095 If `True`, include the word 'and' before the final item in the list. 1096 1097 and_str: str, default 'and' 1098 If `and_` is True, insert this string where 'and' normally would in and English list. 1099 1100 oxford_comma: bool, default True 1101 If `True`, include the Oxford Comma (comma before the final 'and'). 1102 Only applies when `and_` is `True`. 1103 1104 spaces: bool, default True 1105 If `True`, separate items with `space_str` 1106 1107 space_str: str, default ' ' 1108 If `spaces` is `True`, separate items with this string. 1109 1110 Returns 1111 ------- 1112 A string of the items as an English list. 1113 1114 Examples 1115 -------- 1116 >>> items_str([1,2,3]) 1117 "'1', '2', and '3'" 1118 >>> items_str([1,2,3], quotes=False) 1119 '1, 2, and 3' 1120 >>> items_str([1,2,3], and_=False) 1121 "'1', '2', '3'" 1122 >>> items_str([1,2,3], spaces=False, and_=False) 1123 "'1','2','3'" 1124 >>> items_str([1,2,3], oxford_comma=False) 1125 "'1', '2' and '3'" 1126 >>> items_str([1,2,3], quote_str=":") 1127 ':1:, :2:, and :3:' 1128 >>> items_str([1,2,3], and_str="or") 1129 "'1', '2', or '3'" 1130 >>> items_str([1,2,3], space_str="_") 1131 "'1',_'2',_and_'3'" 1132 1133 """ 1134 if not items: 1135 return '' 1136 1137 q = quote_str if quotes else '' 1138 s = space_str if spaces else '' 1139 a = and_str if and_ else '' 1140 c = comma_str if commas else '' 1141 1142 if len(items) == 1: 1143 return q + str(list(items)[0]) + q 1144 1145 if len(items) == 2: 1146 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1147 1148 sep = q + c + s + q 1149 output = q + sep.join(str(i) for i in items[:-1]) + q 1150 if oxford_comma: 1151 output += c 1152 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1153 return output 1154 1155 1156def interval_str(delta: Union[timedelta, int]) -> str: 1157 """ 1158 Return a human-readable string for a `timedelta` (or `int` minutes). 1159 1160 Parameters 1161 ---------- 1162 delta: Union[timedelta, int] 1163 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1164 1165 Returns 1166 ------- 1167 A formatted string, fit for human eyes. 1168 """ 1169 from meerschaum.utils.packages import attempt_import 1170 if is_int(delta): 1171 return str(delta) 1172 humanfriendly = attempt_import('humanfriendly') 1173 delta_seconds = ( 1174 delta.total_seconds() 1175 if isinstance(delta, timedelta) 1176 else (delta * 60) 1177 ) 1178 return humanfriendly.format_timespan(delta_seconds) 1179 1180 1181def is_docker_available() -> bool: 1182 """Check if we can connect to the Docker engine.""" 1183 import subprocess 1184 try: 1185 has_docker = subprocess.call( 1186 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1187 ) == 0 1188 except Exception: 1189 has_docker = False 1190 return has_docker 1191 1192 1193def is_android() -> bool: 1194 """Return `True` if the current platform is Android.""" 1195 import sys 1196 return hasattr(sys, 'getandroidapilevel') 1197 1198 1199def is_bcp_available() -> bool: 1200 """Check if the MSSQL `bcp` utility is installed.""" 1201 import subprocess 1202 1203 try: 1204 has_bcp = subprocess.call( 1205 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1206 ) == 0 1207 except Exception: 1208 has_bcp = False 1209 return has_bcp 1210 1211 1212def is_systemd_available() -> bool: 1213 """Check if running on systemd.""" 1214 import subprocess 1215 try: 1216 has_systemctl = subprocess.call( 1217 ['systemctl', '-h'], 1218 stdout=subprocess.DEVNULL, 1219 stderr=subprocess.STDOUT, 1220 ) == 0 1221 except Exception: 1222 has_systemctl = False 1223 return has_systemctl 1224 1225 1226def is_tmux_available() -> bool: 1227 """ 1228 Check if `tmux` is installed. 1229 """ 1230 import subprocess 1231 try: 1232 has_tmux = subprocess.call( 1233 ['tmux', '-V'], 1234 stdout=subprocess.DEVNULL, 1235 stderr=subprocess.STDOUT 1236 ) == 0 1237 except Exception: 1238 has_tmux = False 1239 return has_tmux 1240 1241def get_last_n_lines(file_name: str, N: int): 1242 """ 1243 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1244 """ 1245 import os 1246 # Create an empty list to keep the track of last N lines 1247 list_of_lines = [] 1248 # Open file for reading in binary mode 1249 with open(file_name, 'rb') as read_obj: 1250 # Move the cursor to the end of the file 1251 read_obj.seek(0, os.SEEK_END) 1252 # Create a buffer to keep the last read line 1253 buffer = bytearray() 1254 # Get the current position of pointer i.e eof 1255 pointer_location = read_obj.tell() 1256 # Loop till pointer reaches the top of the file 1257 while pointer_location >= 0: 1258 # Move the file pointer to the location pointed by pointer_location 1259 read_obj.seek(pointer_location) 1260 # Shift pointer location by -1 1261 pointer_location = pointer_location -1 1262 # read that byte / character 1263 new_byte = read_obj.read(1) 1264 # If the read byte is new line character then it means one line is read 1265 if new_byte == b'\n': 1266 # Save the line in list of lines 1267 list_of_lines.append(buffer.decode()[::-1]) 1268 # If the size of list reaches N, then return the reversed list 1269 if len(list_of_lines) == N: 1270 return list(reversed(list_of_lines)) 1271 # Reinitialize the byte array to save next line 1272 buffer = bytearray() 1273 else: 1274 # If last read character is not eol then add it in buffer 1275 buffer.extend(new_byte) 1276 # As file is read completely, if there is still data in buffer, then its first line. 1277 if len(buffer) > 0: 1278 list_of_lines.append(buffer.decode()[::-1]) 1279 # return the reversed list 1280 return list(reversed(list_of_lines)) 1281 1282 1283def tail(f, n, offset=None): 1284 """ 1285 https://stackoverflow.com/a/692616/9699829 1286 1287 Reads n lines from f with an offset of offset lines. The return 1288 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1289 an indicator that is `True` if there are more lines in the file. 1290 """ 1291 avg_line_length = 74 1292 to_read = n + (offset or 0) 1293 1294 while True: 1295 try: 1296 f.seek(-(avg_line_length * to_read), 2) 1297 except IOError: 1298 # woops. apparently file is smaller than what we want 1299 # to step back, go to the beginning instead 1300 f.seek(0) 1301 pos = f.tell() 1302 lines = f.read().splitlines() 1303 if len(lines) >= to_read or pos == 0: 1304 return lines[-to_read:offset and -offset or None], \ 1305 len(lines) > to_read or pos > 0 1306 avg_line_length *= 1.3 1307 1308 1309def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1310 """ 1311 Remove characters from each section of a string until the length is within the limit. 1312 1313 Parameters 1314 ---------- 1315 item: str 1316 The item name to be truncated. 1317 1318 delimeter: str, default '_' 1319 Split `item` by this string into several sections. 1320 1321 max_len: int, default 128 1322 The max acceptable length of the truncated version of `item`. 1323 1324 Returns 1325 ------- 1326 The truncated string. 1327 1328 Examples 1329 -------- 1330 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1331 'ab_de_gh' 1332 1333 """ 1334 if len(item) < max_len: 1335 return item 1336 1337 def _shorten(s: str) -> str: 1338 return s[:-1] if len(s) > 1 else s 1339 1340 sections = list(enumerate(item.split('_'))) 1341 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1342 available_chars = max_len - len(sections) 1343 1344 _sections = [(i, s) for i, s in sorted_sections] 1345 _sections_len = sum([len(s) for i, s in _sections]) 1346 _old_sections_len = _sections_len 1347 while _sections_len > available_chars: 1348 _sections = [(i, _shorten(s)) for i, s in _sections] 1349 _old_sections_len = _sections_len 1350 _sections_len = sum([len(s) for i, s in _sections]) 1351 if _old_sections_len == _sections_len: 1352 raise Exception(f"String could not be truncated: '{item}'") 1353 1354 new_sections = sorted(_sections, key=lambda x: x[0]) 1355 return delimeter.join([s for i, s in new_sections]) 1356 1357 1358def separate_negation_values( 1359 vals: Union[List[str], Tuple[str]], 1360 negation_prefix: Optional[str] = None, 1361) -> Tuple[List[str], List[str]]: 1362 """ 1363 Separate the negated values from the positive ones. 1364 Return two lists: positive and negative values. 1365 1366 Parameters 1367 ---------- 1368 vals: Union[List[str], Tuple[str]] 1369 A list of strings to parse. 1370 1371 negation_prefix: Optional[str], default None 1372 Include values that start with this string in the second list. 1373 If `None`, use the system default (`_`). 1374 """ 1375 if negation_prefix is None: 1376 from meerschaum.config.static import STATIC_CONFIG 1377 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1378 _in_vals, _ex_vals = [], [] 1379 for v in vals: 1380 if str(v).startswith(negation_prefix): 1381 _ex_vals.append(str(v)[len(negation_prefix):]) 1382 else: 1383 _in_vals.append(v) 1384 1385 return _in_vals, _ex_vals 1386 1387 1388def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1389 """ 1390 Translate a params dictionary into lists of include- and exclude-values. 1391 1392 Parameters 1393 ---------- 1394 params: Optional[Dict[str, Any]] 1395 A params query dictionary. 1396 1397 Returns 1398 ------- 1399 A dictionary mapping keys to a tuple of lists for include and exclude values. 1400 1401 Examples 1402 -------- 1403 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1404 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1405 """ 1406 if not params: 1407 return {} 1408 return { 1409 col: separate_negation_values( 1410 ( 1411 val 1412 if isinstance(val, (list, tuple)) 1413 else [val] 1414 ) 1415 ) 1416 for col, val in params.items() 1417 } 1418 1419 1420def flatten_list(list_: List[Any]) -> List[Any]: 1421 """ 1422 Recursively flatten a list. 1423 """ 1424 for item in list_: 1425 if isinstance(item, list): 1426 yield from flatten_list(item) 1427 else: 1428 yield item 1429 1430 1431def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1432 """ 1433 Parse a string containing the text to be passed into a function 1434 and return a tuple of args, kwargs. 1435 1436 Parameters 1437 ---------- 1438 args_str: str 1439 The contents of the function parameter (as a string). 1440 1441 Returns 1442 ------- 1443 A tuple of args (tuple) and kwargs (dict[str, Any]). 1444 1445 Examples 1446 -------- 1447 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1448 (123, 456), {'foo': 789, 'bar': 'baz'} 1449 """ 1450 import ast 1451 args = [] 1452 kwargs = {} 1453 1454 for part in args_str.split(','): 1455 if '=' in part: 1456 key, val = part.split('=', 1) 1457 kwargs[key.strip()] = ast.literal_eval(val) 1458 else: 1459 args.append(ast.literal_eval(part.strip())) 1460 1461 return tuple(args), kwargs 1462 1463 1464def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1465 """ 1466 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1467 1468 Parameters 1469 ---------- 1470 src_path: pathlib.Path 1471 The source path. 1472 1473 dest_path: pathlib.Path 1474 The destination path. 1475 1476 Returns 1477 ------- 1478 A SuccessTuple indicating success. 1479 """ 1480 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1481 return True, "Symlink already exists." 1482 try: 1483 dest_path.symlink_to(src_path) 1484 success = True 1485 except Exception as e: 1486 success = False 1487 msg = str(e) 1488 if success: 1489 return success, "Success" 1490 1491 ### Failed to create a symlink. 1492 ### If we're not on Windows, return an error. 1493 import platform 1494 if platform.system() != 'Windows': 1495 return success, msg 1496 1497 try: 1498 import _winapi 1499 except ImportError: 1500 return False, "Unable to import _winapi." 1501 1502 if src_path.is_dir(): 1503 try: 1504 _winapi.CreateJunction(str(src_path), str(dest_path)) 1505 except Exception as e: 1506 return False, str(e) 1507 return True, "Success" 1508 1509 ### Last resort: copy the file on Windows. 1510 import shutil 1511 try: 1512 shutil.copy(src_path, dest_path) 1513 except Exception as e: 1514 return False, str(e) 1515 1516 return True, "Success" 1517 1518 1519def is_symlink(path: pathlib.Path) -> bool: 1520 """ 1521 Wrap `path.is_symlink()` but add support for Windows junctions. 1522 """ 1523 if path.is_symlink(): 1524 return True 1525 import platform 1526 import os 1527 if platform.system() != 'Windows': 1528 return False 1529 try: 1530 return bool(os.readlink(path)) 1531 except OSError: 1532 return False 1533 1534 1535def parametrized(dec): 1536 """ 1537 A meta-decorator for allowing other decorator functions to have parameters. 1538 1539 https://stackoverflow.com/a/26151604/9699829 1540 """ 1541 def layer(*args, **kwargs): 1542 def repl(f): 1543 return dec(f, *args, **kwargs) 1544 return repl 1545 return layer 1546 1547 1548def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1549 """ 1550 Safely extract a TAR file to a give directory. 1551 This defends against CVE-2007-4559. 1552 1553 Parameters 1554 ---------- 1555 tarf: file 1556 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1557 1558 output_dir: Union[str, pathlib.Path] 1559 The output directory. 1560 """ 1561 import os 1562 1563 def is_within_directory(directory, target): 1564 abs_directory = os.path.abspath(directory) 1565 abs_target = os.path.abspath(target) 1566 prefix = os.path.commonprefix([abs_directory, abs_target]) 1567 return prefix == abs_directory 1568 1569 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1570 for member in tar.getmembers(): 1571 member_path = os.path.join(path, member.name) 1572 if not is_within_directory(path, member_path): 1573 raise Exception("Attempted Path Traversal in Tar File") 1574 1575 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1576 1577 return safe_extract(tarf, output_dir) 1578 1579 1580################## 1581# Legacy imports # 1582################## 1583 1584def choose_subaction(*args, **kwargs) -> Any: 1585 """ 1586 Placeholder function to prevent breaking legacy behavior. 1587 See `meerschaum.actions.choose_subaction`. 1588 """ 1589 from meerschaum.actions import choose_subaction as _choose_subactions 1590 return _choose_subactions(*args, **kwargs) 1591 1592 1593def print_options(*args, **kwargs) -> None: 1594 """ 1595 Placeholder function to prevent breaking legacy behavior. 1596 See `meerschaum.utils.formatting.print_options`. 1597 """ 1598 from meerschaum.utils.formatting import print_options as _print_options 1599 return _print_options(*args, **kwargs) 1600 1601 1602def to_pandas_dtype(*args, **kwargs) -> Any: 1603 """ 1604 Placeholder function to prevent breaking legacy behavior. 1605 See `meerschaum.utils.dtypes.to_pandas_dtype`. 1606 """ 1607 from meerschaum.utils.dtypes import to_pandas_dtype as _to_pandas_dtype 1608 return _to_pandas_dtype(*args, **kwargs) 1609 1610 1611def filter_unseen_df(*args, **kwargs) -> Any: 1612 """ 1613 Placeholder function to prevent breaking legacy behavior. 1614 See `meerschaum.utils.dataframe.filter_unseen_df`. 1615 """ 1616 from meerschaum.utils.dataframe import filter_unseen_df as real_function 1617 return real_function(*args, **kwargs) 1618 1619 1620def add_missing_cols_to_df(*args, **kwargs) -> Any: 1621 """ 1622 Placeholder function to prevent breaking legacy behavior. 1623 See `meerschaum.utils.dataframe.add_missing_cols_to_df`. 1624 """ 1625 from meerschaum.utils.dataframe import add_missing_cols_to_df as real_function 1626 return real_function(*args, **kwargs) 1627 1628 1629def parse_df_datetimes(*args, **kwargs) -> Any: 1630 """ 1631 Placeholder function to prevent breaking legacy behavior. 1632 See `meerschaum.utils.dataframe.parse_df_datetimes`. 1633 """ 1634 from meerschaum.utils.dataframe import parse_df_datetimes as real_function 1635 return real_function(*args, **kwargs) 1636 1637 1638def df_from_literal(*args, **kwargs) -> Any: 1639 """ 1640 Placeholder function to prevent breaking legacy behavior. 1641 See `meerschaum.utils.dataframe.df_from_literal`. 1642 """ 1643 from meerschaum.utils.dataframe import df_from_literal as real_function 1644 return real_function(*args, **kwargs) 1645 1646 1647def get_json_cols(*args, **kwargs) -> Any: 1648 """ 1649 Placeholder function to prevent breaking legacy behavior. 1650 See `meerschaum.utils.dataframe.get_json_cols`. 1651 """ 1652 from meerschaum.utils.dataframe import get_json_cols as real_function 1653 return real_function(*args, **kwargs) 1654 1655 1656def get_unhashable_cols(*args, **kwargs) -> Any: 1657 """ 1658 Placeholder function to prevent breaking legacy behavior. 1659 See `meerschaum.utils.dataframe.get_unhashable_cols`. 1660 """ 1661 from meerschaum.utils.dataframe import get_unhashable_cols as real_function 1662 return real_function(*args, **kwargs) 1663 1664 1665def enforce_dtypes(*args, **kwargs) -> Any: 1666 """ 1667 Placeholder function to prevent breaking legacy behavior. 1668 See `meerschaum.utils.dataframe.enforce_dtypes`. 1669 """ 1670 from meerschaum.utils.dataframe import enforce_dtypes as real_function 1671 return real_function(*args, **kwargs) 1672 1673 1674def get_datetime_bound_from_df(*args, **kwargs) -> Any: 1675 """ 1676 Placeholder function to prevent breaking legacy behavior. 1677 See `meerschaum.utils.dataframe.get_datetime_bound_from_df`. 1678 """ 1679 from meerschaum.utils.dataframe import get_datetime_bound_from_df as real_function 1680 return real_function(*args, **kwargs) 1681 1682 1683def df_is_chunk_generator(*args, **kwargs) -> Any: 1684 """ 1685 Placeholder function to prevent breaking legacy behavior. 1686 See `meerschaum.utils.dataframe.df_is_chunk_generator`. 1687 """ 1688 from meerschaum.utils.dataframe import df_is_chunk_generator as real_function 1689 return real_function(*args, **kwargs) 1690 1691 1692def choices_docstring(*args, **kwargs) -> Any: 1693 """ 1694 Placeholder function to prevent breaking legacy behavior. 1695 See `meerschaum.actions.choices_docstring`. 1696 """ 1697 from meerschaum.actions import choices_docstring as real_function 1698 return real_function(*args, **kwargs) 1699 1700 1701def _get_subaction_names(*args, **kwargs) -> Any: 1702 """ 1703 Placeholder function to prevent breaking legacy behavior. 1704 See `meerschaum.actions._get_subaction_names`. 1705 """ 1706 from meerschaum.actions import _get_subaction_names as real_function 1707 return real_function(*args, **kwargs) 1708 1709 1710def json_serialize_datetime(dt: datetime) -> Union[str, None]: 1711 """ 1712 Serialize a datetime object into JSON (ISO format string). 1713 1714 Examples 1715 -------- 1716 >>> import json 1717 >>> from datetime import datetime 1718 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 1719 '{"a": "2022-01-01T00:00:00Z"}' 1720 1721 """ 1722 from meerschaum.utils.dtypes import serialize_datetime 1723 return serialize_datetime(dt) 1724 1725 1726_current_module = sys.modules[__name__] 1727__all__ = tuple( 1728 name 1729 for name, obj in globals().items() 1730 if callable(obj) 1731 and name not in __pdoc__ 1732 and getattr(obj, '__module__', None) == _current_module.__name__ 1733)
49def add_method_to_class( 50 func: Callable[[Any], Any], 51 class_def: 'Class', 52 method_name: Optional[str] = None, 53 keep_self: Optional[bool] = None, 54 ) -> Callable[[Any], Any]: 55 """ 56 Add function `func` to class `class_def`. 57 58 Parameters 59 ---------- 60 func: Callable[[Any], Any] 61 Function to be added as a method of the class 62 63 class_def: Class 64 Class to be modified. 65 66 method_name: Optional[str], default None 67 New name of the method. None will use func.__name__ (default). 68 69 Returns 70 ------- 71 The modified function object. 72 73 """ 74 from functools import wraps 75 76 is_class = isinstance(class_def, type) 77 78 @wraps(func) 79 def wrapper(self, *args, **kw): 80 return func(*args, **kw) 81 82 if method_name is None: 83 method_name = func.__name__ 84 85 setattr(class_def, method_name, ( 86 wrapper if ((is_class and keep_self is None) or keep_self is False) else func 87 ) 88 ) 89 90 return func
Add function func
to class class_def
.
Parameters
- func (Callable[[Any], Any]): Function to be added as a method of the class
- class_def (Class): Class to be modified.
- method_name (Optional[str], default None): New name of the method. None will use func.__name__ (default).
Returns
- The modified function object.
93def generate_password(length: int = 12) -> str: 94 """ 95 Generate a secure password of given length. 96 97 Parameters 98 ---------- 99 length: int, default 12 100 The length of the password. 101 102 Returns 103 ------- 104 A random password string. 105 """ 106 import secrets 107 import string 108 return ''.join((secrets.choice(string.ascii_letters + string.digits) for i in range(length)))
Generate a secure password of given length.
Parameters
- length (int, default 12): The length of the password.
Returns
- A random password string.
111def is_int(s: str) -> bool: 112 """ 113 Check if string is an int. 114 115 Parameters 116 ---------- 117 s: str 118 The string to be checked. 119 120 Returns 121 ------- 122 A bool indicating whether the string was able to be cast to an integer. 123 124 """ 125 try: 126 float(s) 127 except Exception: 128 return False 129 130 return float(s).is_integer()
Check if string is an int.
Parameters
- s (str): The string to be checked.
Returns
- A bool indicating whether the string was able to be cast to an integer.
133def string_to_dict( 134 params_string: str 135) -> Dict[str, Any]: 136 """ 137 Parse a string into a dictionary. 138 139 If the string begins with '{', parse as JSON. Otherwise use simple parsing. 140 141 Parameters 142 ---------- 143 params_string: str 144 The string to be parsed. 145 146 Returns 147 ------- 148 The parsed dictionary. 149 150 Examples 151 -------- 152 >>> string_to_dict("a:1,b:2") 153 {'a': 1, 'b': 2} 154 >>> string_to_dict('{"a": 1, "b": 2}') 155 {'a': 1, 'b': 2} 156 157 """ 158 if params_string == "": 159 return {} 160 161 import json 162 163 ### Kind of a weird edge case. 164 ### In the generated compose file, there is some weird escaping happening, 165 ### so the string to be parsed starts and ends with a single quote. 166 if ( 167 isinstance(params_string, str) 168 and len(params_string) > 4 169 and params_string[1] == "{" 170 and params_string[-2] == "}" 171 ): 172 return json.loads(params_string[1:-1]) 173 if str(params_string).startswith('{'): 174 return json.loads(params_string) 175 176 import ast 177 params_dict = {} 178 for param in params_string.split(","): 179 _keys = param.split(":") 180 keys = _keys[:-1] 181 try: 182 val = ast.literal_eval(_keys[-1]) 183 except Exception: 184 val = str(_keys[-1]) 185 186 c = params_dict 187 for _k in keys[:-1]: 188 try: 189 k = ast.literal_eval(_k) 190 except Exception: 191 k = str(_k) 192 if k not in c: 193 c[k] = {} 194 c = c[k] 195 196 c[keys[-1]] = val 197 198 return params_dict
Parse a string into a dictionary.
If the string begins with '{', parse as JSON. Otherwise use simple parsing.
Parameters
- params_string (str): The string to be parsed.
Returns
- The parsed dictionary.
Examples
>>> string_to_dict("a:1,b:2")
{'a': 1, 'b': 2}
>>> string_to_dict('{"a": 1, "b": 2}')
{'a': 1, 'b': 2}
201def parse_config_substitution( 202 value: str, 203 leading_key: str = 'MRSM', 204 begin_key: str = '{', 205 end_key: str = '}', 206 delimeter: str = ':' 207) -> List[Any]: 208 """ 209 Parse Meerschaum substitution syntax 210 E.g. MRSM{value1:value2} => ['value1', 'value2'] 211 NOTE: Not currently used. See `search_and_substitute_config` in `meerschaum.config._read_yaml`. 212 """ 213 if not value.beginswith(leading_key): 214 return value 215 216 return leading_key[len(leading_key):][len():-1].split(delimeter)
Parse Meerschaum substitution syntax
E.g. MRSM{value1:value2} => ['value1', 'value2']
NOTE: Not currently used. See search_and_substitute_config
in meerschaum.config._read_yaml
.
219def edit_file( 220 path: Union['pathlib.Path', str], 221 default_editor: str = 'pyvim', 222 debug: bool = False 223) -> bool: 224 """ 225 Open a file for editing. 226 227 Attempt to launch the user's defined `$EDITOR`, otherwise use `pyvim`. 228 229 Parameters 230 ---------- 231 path: Union[pathlib.Path, str] 232 The path to the file to be edited. 233 234 default_editor: str, default 'pyvim' 235 If `$EDITOR` is not set, use this instead. 236 If `pyvim` is not installed, it will install it from PyPI. 237 238 debug: bool, default False 239 Verbosity toggle. 240 241 Returns 242 ------- 243 A bool indicating the file was successfully edited. 244 """ 245 import os 246 from subprocess import call 247 from meerschaum.utils.debug import dprint 248 from meerschaum.utils.packages import run_python_package, attempt_import, package_venv 249 try: 250 EDITOR = os.environ.get('EDITOR', default_editor) 251 if debug: 252 dprint(f"Opening file '{path}' with editor '{EDITOR}'...") 253 rc = call([EDITOR, path]) 254 except Exception as e: ### can't open with default editors 255 if debug: 256 dprint(str(e)) 257 dprint('Failed to open file with system editor. Falling back to pyvim...') 258 pyvim = attempt_import('pyvim', lazy=False) 259 rc = run_python_package('pyvim', [path], venv=package_venv(pyvim), debug=debug) 260 return rc == 0
Open a file for editing.
Attempt to launch the user's defined $EDITOR
, otherwise use pyvim
.
Parameters
- path (Union[pathlib.Path, str]): The path to the file to be edited.
- default_editor (str, default 'pyvim'):
If
$EDITOR
is not set, use this instead. Ifpyvim
is not installed, it will install it from PyPI. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating the file was successfully edited.
263def is_pipe_registered( 264 pipe: mrsm.Pipe, 265 pipes: PipesDict, 266 debug: bool = False 267) -> bool: 268 """ 269 Check if a Pipe is inside the pipes dictionary. 270 271 Parameters 272 ---------- 273 pipe: meerschaum.Pipe 274 The pipe to see if it's in the dictionary. 275 276 pipes: PipesDict 277 The dictionary to search inside. 278 279 debug: bool, default False 280 Verbosity toggle. 281 282 Returns 283 ------- 284 A bool indicating whether the pipe is inside the dictionary. 285 """ 286 from meerschaum.utils.debug import dprint 287 ck, mk, lk = pipe.connector_keys, pipe.metric_key, pipe.location_key 288 if debug: 289 dprint(f'{ck}, {mk}, {lk}') 290 dprint(f'{pipe}, {pipes}') 291 return ck in pipes and mk in pipes[ck] and lk in pipes[ck][mk]
Check if a Pipe is inside the pipes dictionary.
Parameters
- pipe (meerschaum.Pipe): The pipe to see if it's in the dictionary.
- pipes (PipesDict): The dictionary to search inside.
- debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating whether the pipe is inside the dictionary.
294def get_cols_lines(default_cols: int = 100, default_lines: int = 120) -> Tuple[int, int]: 295 """ 296 Determine the columns and lines in the terminal. 297 If they cannot be determined, return the default values (100 columns and 120 lines). 298 299 Parameters 300 ---------- 301 default_cols: int, default 100 302 If the columns cannot be determined, return this value. 303 304 default_lines: int, default 120 305 If the lines cannot be determined, return this value. 306 307 Returns 308 ------- 309 A tuple if integers for the columns and lines. 310 """ 311 import os 312 try: 313 size = os.get_terminal_size() 314 _cols, _lines = size.columns, size.lines 315 except Exception as e: 316 _cols, _lines = ( 317 int(os.environ.get('COLUMNS', str(default_cols))), 318 int(os.environ.get('LINES', str(default_lines))), 319 ) 320 return _cols, _lines
Determine the columns and lines in the terminal. If they cannot be determined, return the default values (100 columns and 120 lines).
Parameters
- default_cols (int, default 100): If the columns cannot be determined, return this value.
- default_lines (int, default 120): If the lines cannot be determined, return this value.
Returns
- A tuple if integers for the columns and lines.
323def iterate_chunks(iterable, chunksize: int, fillvalue: Optional[Any] = None): 324 """ 325 Iterate over a list in chunks. 326 https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks 327 328 Parameters 329 ---------- 330 iterable: Iterable[Any] 331 The iterable to iterate over in chunks. 332 333 chunksize: int 334 The size of chunks to iterate with. 335 336 fillvalue: Optional[Any], default None 337 If the chunks do not evenly divide into the iterable, pad the end with this value. 338 339 Returns 340 ------- 341 A generator of tuples of size `chunksize`. 342 343 """ 344 from itertools import zip_longest 345 args = [iter(iterable)] * chunksize 346 return zip_longest(*args, fillvalue=fillvalue)
Iterate over a list in chunks. https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks
Parameters
- iterable (Iterable[Any]): The iterable to iterate over in chunks.
- chunksize (int): The size of chunks to iterate with.
- fillvalue (Optional[Any], default None): If the chunks do not evenly divide into the iterable, pad the end with this value.
Returns
- A generator of tuples of size
chunksize
.
348def sorted_dict(d: Dict[Any, Any]) -> Dict[Any, Any]: 349 """ 350 Sort a dictionary's values and return a new dictionary. 351 352 Parameters 353 ---------- 354 d: Dict[Any, Any] 355 The dictionary to be sorted. 356 357 Returns 358 ------- 359 A sorted dictionary. 360 361 Examples 362 -------- 363 >>> sorted_dict({'b': 1, 'a': 2}) 364 {'b': 1, 'a': 2} 365 >>> sorted_dict({'b': 2, 'a': 1}) 366 {'a': 1, 'b': 2} 367 368 """ 369 try: 370 return {key: value for key, value in sorted(d.items(), key=lambda item: item[1])} 371 except Exception as e: 372 return d
Sort a dictionary's values and return a new dictionary.
Parameters
- d (Dict[Any, Any]): The dictionary to be sorted.
Returns
- A sorted dictionary.
Examples
>>> sorted_dict({'b': 1, 'a': 2})
{'b': 1, 'a': 2}
>>> sorted_dict({'b': 2, 'a': 1})
{'a': 1, 'b': 2}
374def flatten_pipes_dict(pipes_dict: PipesDict) -> List[Pipe]: 375 """ 376 Convert the standard pipes dictionary into a list. 377 378 Parameters 379 ---------- 380 pipes_dict: PipesDict 381 The pipes dictionary to be flattened. 382 383 Returns 384 ------- 385 A list of `Pipe` objects. 386 387 """ 388 pipes_list = [] 389 for ck in pipes_dict.values(): 390 for mk in ck.values(): 391 pipes_list += list(mk.values()) 392 return pipes_list
Convert the standard pipes dictionary into a list.
Parameters
- pipes_dict (PipesDict): The pipes dictionary to be flattened.
Returns
- A list of
Pipe
objects.
395def round_time( 396 dt: Optional[datetime] = None, 397 date_delta: Optional[timedelta] = None, 398 to: 'str' = 'down' 399) -> datetime: 400 """ 401 Round a datetime object to a multiple of a timedelta. 402 http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python 403 404 NOTE: This function strips timezone information! 405 406 Parameters 407 ---------- 408 dt: Optional[datetime], default None 409 If `None`, grab the current UTC datetime. 410 411 date_delta: Optional[timedelta], default None 412 If `None`, use a delta of 1 minute. 413 414 to: 'str', default 'down' 415 Available options are `'up'`, `'down'`, and `'closest'`. 416 417 Returns 418 ------- 419 A rounded `datetime` object. 420 421 Examples 422 -------- 423 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200)) 424 datetime.datetime(2022, 1, 1, 12, 15) 425 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up') 426 datetime.datetime(2022, 1, 1, 12, 16) 427 >>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1)) 428 datetime.datetime(2022, 1, 1, 12, 0) 429 >>> round_time( 430 ... datetime(2022, 1, 1, 12, 15, 57, 200), 431 ... timedelta(hours=1), 432 ... to = 'closest' 433 ... ) 434 datetime.datetime(2022, 1, 1, 12, 0) 435 >>> round_time( 436 ... datetime(2022, 1, 1, 12, 45, 57, 200), 437 ... datetime.timedelta(hours=1), 438 ... to = 'closest' 439 ... ) 440 datetime.datetime(2022, 1, 1, 13, 0) 441 442 """ 443 if date_delta is None: 444 date_delta = timedelta(minutes=1) 445 round_to = date_delta.total_seconds() 446 if dt is None: 447 dt = datetime.now(timezone.utc).replace(tzinfo=None) 448 seconds = (dt.replace(tzinfo=None) - dt.min.replace(tzinfo=None)).seconds 449 450 if seconds % round_to == 0 and dt.microsecond == 0: 451 rounding = (seconds + round_to / 2) // round_to * round_to 452 else: 453 if to == 'up': 454 rounding = (seconds + dt.microsecond/1000000 + round_to) // round_to * round_to 455 elif to == 'down': 456 rounding = seconds // round_to * round_to 457 else: 458 rounding = (seconds + round_to / 2) // round_to * round_to 459 460 return dt + timedelta(0, rounding - seconds, - dt.microsecond)
Round a datetime object to a multiple of a timedelta. http://stackoverflow.com/questions/3463930/how-to-round-the-minute-of-a-datetime-object-python
NOTE: This function strips timezone information!
Parameters
- dt (Optional[datetime], default None):
If
None
, grab the current UTC datetime. - date_delta (Optional[timedelta], default None):
If
None
, use a delta of 1 minute. - to ('str', default 'down'):
Available options are
'up'
,'down'
, and'closest'
.
Returns
- A rounded
datetime
object.
Examples
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200))
datetime.datetime(2022, 1, 1, 12, 15)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), to='up')
datetime.datetime(2022, 1, 1, 12, 16)
>>> round_time(datetime(2022, 1, 1, 12, 15, 57, 200), timedelta(hours=1))
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 15, 57, 200),
... timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 12, 0)
>>> round_time(
... datetime(2022, 1, 1, 12, 45, 57, 200),
... datetime.timedelta(hours=1),
... to = 'closest'
... )
datetime.datetime(2022, 1, 1, 13, 0)
463def timed_input( 464 seconds: int = 10, 465 timeout_message: str = "", 466 prompt: str = "", 467 icon: bool = False, 468 **kw 469 ) -> Union[str, None]: 470 """ 471 Accept user input only for a brief period of time. 472 473 Parameters 474 ---------- 475 seconds: int, default 10 476 The number of seconds to wait. 477 478 timeout_message: str, default '' 479 The message to print after the window has elapsed. 480 481 prompt: str, default '' 482 The prompt to print during the window. 483 484 icon: bool, default False 485 If `True`, print the configured input icon. 486 487 488 Returns 489 ------- 490 The input string entered by the user. 491 492 """ 493 import signal, time 494 495 class TimeoutExpired(Exception): 496 """Raise this exception when the timeout is reached.""" 497 498 def alarm_handler(signum, frame): 499 raise TimeoutExpired 500 501 # set signal handler 502 signal.signal(signal.SIGALRM, alarm_handler) 503 signal.alarm(seconds) # produce SIGALRM in `timeout` seconds 504 505 try: 506 return input(prompt) 507 except TimeoutExpired: 508 return None 509 except (EOFError, RuntimeError): 510 try: 511 print(prompt) 512 time.sleep(seconds) 513 except TimeoutExpired: 514 return None 515 finally: 516 signal.alarm(0) # cancel alarm
Accept user input only for a brief period of time.
Parameters
- seconds (int, default 10): The number of seconds to wait.
- timeout_message (str, default ''): The message to print after the window has elapsed.
- prompt (str, default ''): The prompt to print during the window.
- icon (bool, default False):
If
True
, print the configured input icon.
Returns
- The input string entered by the user.
522def replace_pipes_in_dict( 523 pipes : Optional[PipesDict] = None, 524 func: 'function' = str, 525 debug: bool = False, 526 **kw 527 ) -> PipesDict: 528 """ 529 Replace the Pipes in a Pipes dict with the result of another function. 530 531 Parameters 532 ---------- 533 pipes: Optional[PipesDict], default None 534 The pipes dict to be processed. 535 536 func: Callable[[Any], Any], default str 537 The function to be applied to every pipe. 538 Defaults to the string constructor. 539 540 debug: bool, default False 541 Verbosity toggle. 542 543 544 Returns 545 ------- 546 A dictionary where every pipe is replaced with the output of a function. 547 548 """ 549 import copy 550 def change_dict(d : Dict[Any, Any], func : 'function') -> None: 551 for k, v in d.items(): 552 if isinstance(v, dict): 553 change_dict(v, func) 554 else: 555 d[k] = func(v) 556 557 if pipes is None: 558 from meerschaum import get_pipes 559 pipes = get_pipes(debug=debug, **kw) 560 561 result = copy.deepcopy(pipes) 562 change_dict(result, func) 563 return result
Replace the Pipes in a Pipes dict with the result of another function.
Parameters
- pipes (Optional[PipesDict], default None): The pipes dict to be processed.
- func (Callable[[Any], Any], default str): The function to be applied to every pipe. Defaults to the string constructor.
- debug (bool, default False): Verbosity toggle.
Returns
- A dictionary where every pipe is replaced with the output of a function.
565def enforce_gevent_monkey_patch(): 566 """ 567 Check if gevent monkey patching is enabled, and if not, then apply patching. 568 """ 569 from meerschaum.utils.packages import attempt_import 570 import socket 571 gevent, gevent_socket, gevent_monkey = attempt_import( 572 'gevent', 'gevent.socket', 'gevent.monkey' 573 ) 574 if not socket.socket is gevent_socket.socket: 575 gevent_monkey.patch_all()
Check if gevent monkey patching is enabled, and if not, then apply patching.
577def is_valid_email(email: str) -> Union['re.Match', None]: 578 """ 579 Check whether a string is a valid email. 580 581 Parameters 582 ---------- 583 email: str 584 The string to be examined. 585 586 Returns 587 ------- 588 None if a string is not in email format, otherwise a `re.Match` object, which is truthy. 589 590 Examples 591 -------- 592 >>> is_valid_email('foo') 593 >>> is_valid_email('foo@foo.com') 594 <re.Match object; span=(0, 11), match='foo@foo.com'> 595 596 """ 597 import re 598 regex = r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$' 599 return re.search(regex, email)
Check whether a string is a valid email.
Parameters
- email (str): The string to be examined.
Returns
- None if a string is not in email format, otherwise a
re.Match
object, which is truthy.
Examples
>>> is_valid_email('foo')
>>> is_valid_email('foo@foo.com')
<re.Match object; span=(0, 11), match='foo@foo.com'>
602def string_width(string: str, widest: bool = True) -> int: 603 """ 604 Calculate the width of a string, either by its widest or last line. 605 606 Parameters 607 ---------- 608 string: str: 609 The string to be examined. 610 611 widest: bool, default True 612 No longer used because `widest` is always assumed to be true. 613 614 Returns 615 ------- 616 An integer for the text's visual width. 617 618 Examples 619 -------- 620 >>> string_width('a') 621 1 622 >>> string_width('a\\nbc\\nd') 623 2 624 625 """ 626 def _widest(): 627 words = string.split('\n') 628 max_length = 0 629 for w in words: 630 length = len(w) 631 if length > max_length: 632 max_length = length 633 return max_length 634 635 return _widest()
Calculate the width of a string, either by its widest or last line.
Parameters
- string (str:): The string to be examined.
- widest (bool, default True):
No longer used because
widest
is always assumed to be true.
Returns
- An integer for the text's visual width.
Examples
>>> string_width('a')
1
>>> string_width('a\nbc\nd')
2
637def _pyinstaller_traverse_dir( 638 directory: str, 639 ignore_patterns: Iterable[str] = ('.pyc', 'dist', 'build', '.git', '.log'), 640 include_dotfiles: bool = False 641 ) -> list: 642 """ 643 Recursively traverse a directory and return a list of its contents. 644 """ 645 import os, pathlib 646 paths = [] 647 _directory = pathlib.Path(directory) 648 649 def _found_pattern(name: str): 650 for pattern in ignore_patterns: 651 if pattern.replace('/', os.path.sep) in str(name): 652 return True 653 return False 654 655 for root, dirs, files in os.walk(_directory): 656 _root = str(root)[len(str(_directory.parent)):] 657 if _root.startswith(os.path.sep): 658 _root = _root[len(os.path.sep):] 659 if _root.startswith('.') and not include_dotfiles: 660 continue 661 ### ignore certain patterns 662 if _found_pattern(_root): 663 continue 664 665 for filename in files: 666 if filename.startswith('.') and not include_dotfiles: 667 continue 668 path = os.path.join(root, filename) 669 if _found_pattern(path): 670 continue 671 672 _path = str(path)[len(str(_directory.parent)):] 673 if _path.startswith(os.path.sep): 674 _path = _path[len(os.path.sep):] 675 _path = os.path.sep.join(_path.split(os.path.sep)[:-1]) 676 677 paths.append((path, _path)) 678 return paths
Recursively traverse a directory and return a list of its contents.
681def replace_password(d: Dict[str, Any], replace_with: str = '*') -> Dict[str, Any]: 682 """ 683 Recursively replace passwords in a dictionary. 684 685 Parameters 686 ---------- 687 d: Dict[str, Any] 688 The dictionary to search through. 689 690 replace_with: str, default '*' 691 The string to replace each character of the password with. 692 693 Returns 694 ------- 695 Another dictionary where values to the keys `'password'` 696 are replaced with `replace_with` (`'*'`). 697 698 Examples 699 -------- 700 >>> replace_password({'a': 1}) 701 {'a': 1} 702 >>> replace_password({'password': '123'}) 703 {'password': '***'} 704 >>> replace_password({'nested': {'password': '123'}}) 705 {'nested': {'password': '***'}} 706 >>> replace_password({'password': '123'}, replace_with='!') 707 {'password': '!!!'} 708 709 """ 710 import copy 711 _d = copy.deepcopy(d) 712 for k, v in d.items(): 713 if isinstance(v, dict): 714 _d[k] = replace_password(v) 715 elif 'password' in str(k).lower(): 716 _d[k] = ''.join([replace_with for char in str(v)]) 717 elif str(k).lower() == 'uri': 718 from meerschaum.connectors.sql import SQLConnector 719 try: 720 uri_params = SQLConnector.parse_uri(v) 721 except Exception as e: 722 uri_params = None 723 if not uri_params: 724 continue 725 if not 'username' in uri_params or not 'password' in uri_params: 726 continue 727 _d[k] = v.replace( 728 uri_params['username'] + ':' + uri_params['password'], 729 uri_params['username'] + ':' + ''.join( 730 [replace_with for char in str(uri_params['password'])] 731 ) 732 ) 733 return _d
Recursively replace passwords in a dictionary.
Parameters
- d (Dict[str, Any]): The dictionary to search through.
- replace_with (str, default '*'): The string to replace each character of the password with.
Returns
- Another dictionary where values to the keys
'password'
- are replaced with
replace_with
('*'
).
Examples
>>> replace_password({'a': 1})
{'a': 1}
>>> replace_password({'password': '123'})
{'password': '***'}
>>> replace_password({'nested': {'password': '123'}})
{'nested': {'password': '***'}}
>>> replace_password({'password': '123'}, replace_with='!')
{'password': '!!!'}
736def filter_arguments( 737 func: Callable[[Any], Any], 738 *args: Any, 739 **kwargs: Any 740) -> Tuple[Tuple[Any], Dict[str, Any]]: 741 """ 742 Filter out unsupported positional and keyword arguments. 743 744 Parameters 745 ---------- 746 func: Callable[[Any], Any] 747 The function to inspect. 748 749 *args: Any 750 Positional arguments to filter and pass to `func`. 751 752 **kwargs 753 Keyword arguments to filter and pass to `func`. 754 755 Returns 756 ------- 757 The `args` and `kwargs` accepted by `func`. 758 """ 759 args = filter_positionals(func, *args) 760 kwargs = filter_keywords(func, **kwargs) 761 return args, kwargs
Filter out unsupported positional and keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
Positional arguments to filter and pass to
func
. - **kwargs: Keyword arguments to filter and pass to
func
.
Returns
- The
args
andkwargs
accepted byfunc
.
764def filter_keywords( 765 func: Callable[[Any], Any], 766 **kw: Any 767) -> Dict[str, Any]: 768 """ 769 Filter out unsupported keyword arguments. 770 771 Parameters 772 ---------- 773 func: Callable[[Any], Any] 774 The function to inspect. 775 776 **kw: Any 777 The arguments to be filtered and passed into `func`. 778 779 Returns 780 ------- 781 A dictionary of keyword arguments accepted by `func`. 782 783 Examples 784 -------- 785 ```python 786 >>> def foo(a=1, b=2): 787 ... return a * b 788 >>> filter_keywords(foo, a=2, b=4, c=6) 789 {'a': 2, 'b': 4} 790 >>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6})) 791 8 792 ``` 793 794 """ 795 import inspect 796 func_params = inspect.signature(func).parameters 797 ### If the function has a **kw method, skip filtering. 798 for param, _type in func_params.items(): 799 if '**' in str(_type): 800 return kw 801 return {k: v for k, v in kw.items() if k in func_params}
Filter out unsupported keyword arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- **kw (Any):
The arguments to be filtered and passed into
func
.
Returns
- A dictionary of keyword arguments accepted by
func
.
Examples
>>> def foo(a=1, b=2):
... return a * b
>>> filter_keywords(foo, a=2, b=4, c=6)
{'a': 2, 'b': 4}
>>> foo(**filter_keywords(foo, **{'a': 2, 'b': 4, 'c': 6}))
8
804def filter_positionals( 805 func: Callable[[Any], Any], 806 *args: Any 807) -> Tuple[Any]: 808 """ 809 Filter out unsupported positional arguments. 810 811 Parameters 812 ---------- 813 func: Callable[[Any], Any] 814 The function to inspect. 815 816 *args: Any 817 The arguments to be filtered and passed into `func`. 818 NOTE: If the function signature expects more arguments than provided, 819 the missing slots will be filled with `None`. 820 821 Returns 822 ------- 823 A tuple of positional arguments accepted by `func`. 824 825 Examples 826 -------- 827 ```python 828 >>> def foo(a, b): 829 ... return a * b 830 >>> filter_positionals(foo, 2, 4, 6) 831 (2, 4) 832 >>> foo(*filter_positionals(foo, 2, 4, 6)) 833 8 834 ``` 835 836 """ 837 import inspect 838 from meerschaum.utils.warnings import warn 839 func_params = inspect.signature(func).parameters 840 acceptable_args: List[Any] = [] 841 842 def _warn_invalids(_num_invalid): 843 if _num_invalid > 0: 844 warn( 845 "Too few arguments were provided. " 846 + f"{_num_invalid} argument" 847 + ('s have ' if _num_invalid != 1 else " has ") 848 + " been filled with `None`.", 849 ) 850 851 num_invalid: int = 0 852 for i, (param, val) in enumerate(func_params.items()): 853 if '=' in str(val) or '*' in str(val): 854 _warn_invalids(num_invalid) 855 return tuple(acceptable_args) 856 857 try: 858 acceptable_args.append(args[i]) 859 except IndexError: 860 acceptable_args.append(None) 861 num_invalid += 1 862 863 _warn_invalids(num_invalid) 864 return tuple(acceptable_args)
Filter out unsupported positional arguments.
Parameters
- func (Callable[[Any], Any]): The function to inspect.
- *args (Any):
The arguments to be filtered and passed into
func
. NOTE: If the function signature expects more arguments than provided, the missing slots will be filled withNone
.
Returns
- A tuple of positional arguments accepted by
func
.
Examples
>>> def foo(a, b):
... return a * b
>>> filter_positionals(foo, 2, 4, 6)
(2, 4)
>>> foo(*filter_positionals(foo, 2, 4, 6))
8
867def dict_from_od(od: collections.OrderedDict) -> Dict[Any, Any]: 868 """ 869 Convert an ordered dict to a dict. 870 Does not mutate the original OrderedDict. 871 """ 872 from collections import OrderedDict 873 _d = dict(od) 874 for k, v in od.items(): 875 if isinstance(v, OrderedDict) or ( 876 issubclass(type(v), OrderedDict) 877 ): 878 _d[k] = dict_from_od(v) 879 return _d
Convert an ordered dict to a dict. Does not mutate the original OrderedDict.
881def remove_ansi(s: str) -> str: 882 """ 883 Remove ANSI escape characters from a string. 884 885 Parameters 886 ---------- 887 s: str: 888 The string to be cleaned. 889 890 Returns 891 ------- 892 A string with the ANSI characters removed. 893 894 Examples 895 -------- 896 >>> remove_ansi("\x1b[1;31mHello, World!\x1b[0m") 897 'Hello, World!' 898 899 """ 900 import re 901 return re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])').sub('', s)
Remove ANSI escape characters from a string.
Parameters
- s (str:): The string to be cleaned.
Returns
- A string with the ANSI characters removed.
Examples
>>> remove_ansi("[1;31mHello, World![0m")
'Hello, World!'
904def get_connector_labels( 905 *types: str, 906 search_term: str = '', 907 ignore_exact_match = True, 908 _additional_options: Optional[List[str]] = None, 909) -> List[str]: 910 """ 911 Read connector labels from the configuration dictionary. 912 913 Parameters 914 ---------- 915 *types: str 916 The connector types. 917 If none are provided, use the defined types (`'sql'` and `'api'`) and `'plugin'`. 918 919 search_term: str, default '' 920 A filter on the connectors' labels. 921 922 ignore_exact_match: bool, default True 923 If `True`, skip a connector if the search_term is an exact match. 924 925 Returns 926 ------- 927 A list of the keys of defined connectors. 928 929 """ 930 from meerschaum.config import get_config 931 connectors = get_config('meerschaum', 'connectors') 932 933 _types = list(types) 934 if len(_types) == 0: 935 _types = list(connectors.keys()) + ['plugin'] 936 937 conns = [] 938 for t in _types: 939 if t == 'plugin': 940 from meerschaum.plugins import get_data_plugins 941 conns += [ 942 f'{t}:' + plugin.module.__name__.split('.')[-1] 943 for plugin in get_data_plugins() 944 ] 945 continue 946 conns += [ f'{t}:{label}' for label in connectors.get(t, {}) if label != 'default' ] 947 948 if _additional_options: 949 conns += _additional_options 950 951 possibilities = [ 952 c 953 for c in conns 954 if c.startswith(search_term) 955 and c != ( 956 search_term if ignore_exact_match else '' 957 ) 958 ] 959 return sorted(possibilities)
Read connector labels from the configuration dictionary.
Parameters
- *types (str):
The connector types.
If none are provided, use the defined types (
'sql'
and'api'
) and'plugin'
. - search_term (str, default ''): A filter on the connectors' labels.
- ignore_exact_match (bool, default True):
If
True
, skip a connector if the search_term is an exact match.
Returns
- A list of the keys of defined connectors.
962def wget( 963 url: str, 964 dest: Optional[Union[str, 'pathlib.Path']] = None, 965 headers: Optional[Dict[str, Any]] = None, 966 color: bool = True, 967 debug: bool = False, 968 **kw: Any 969) -> 'pathlib.Path': 970 """ 971 Mimic `wget` with `requests`. 972 973 Parameters 974 ---------- 975 url: str 976 The URL to the resource to be downloaded. 977 978 dest: Optional[Union[str, pathlib.Path]], default None 979 The destination path of the downloaded file. 980 If `None`, save to the current directory. 981 982 color: bool, default True 983 If `debug` is `True`, print color output. 984 985 debug: bool, default False 986 Verbosity toggle. 987 988 Returns 989 ------- 990 The path to the downloaded file. 991 992 """ 993 from meerschaum.utils.warnings import warn, error 994 from meerschaum.utils.debug import dprint 995 import os, pathlib, re, urllib.request 996 if headers is None: 997 headers = {} 998 request = urllib.request.Request(url, headers=headers) 999 if not color: 1000 dprint = print 1001 if debug: 1002 dprint(f"Downloading from '{url}'...") 1003 try: 1004 response = urllib.request.urlopen(request) 1005 except Exception as e: 1006 import ssl 1007 ssl._create_default_https_context = ssl._create_unverified_context 1008 try: 1009 response = urllib.request.urlopen(request) 1010 except Exception as _e: 1011 print(_e) 1012 response = None 1013 if response is None or response.code != 200: 1014 error_msg = f"Failed to download from '{url}'." 1015 if color: 1016 error(error_msg) 1017 else: 1018 print(error_msg) 1019 import sys 1020 sys.exit(1) 1021 1022 d = response.headers.get('content-disposition', None) 1023 fname = ( 1024 re.findall("filename=(.+)", d)[0].strip('"') if d is not None 1025 else url.split('/')[-1] 1026 ) 1027 1028 if dest is None: 1029 dest = pathlib.Path(os.path.join(os.getcwd(), fname)) 1030 elif isinstance(dest, str): 1031 dest = pathlib.Path(dest) 1032 1033 with open(dest, 'wb') as f: 1034 f.write(response.fp.read()) 1035 1036 if debug: 1037 dprint(f"Downloaded file '{dest}'.") 1038 1039 return dest
Mimic wget
with requests
.
Parameters
- url (str): The URL to the resource to be downloaded.
- dest (Optional[Union[str, pathlib.Path]], default None):
The destination path of the downloaded file.
If
None
, save to the current directory. - color (bool, default True):
If
debug
isTrue
, print color output. - debug (bool, default False): Verbosity toggle.
Returns
- The path to the downloaded file.
1042def async_wrap(func): 1043 """ 1044 Run a synchronous function as async. 1045 https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn 1046 """ 1047 import asyncio 1048 from functools import wraps, partial 1049 1050 @wraps(func) 1051 async def run(*args, loop=None, executor=None, **kwargs): 1052 if loop is None: 1053 loop = asyncio.get_event_loop() 1054 pfunc = partial(func, *args, **kwargs) 1055 return await loop.run_in_executor(executor, pfunc) 1056 return run
Run a synchronous function as async. https://dev.to/0xbf/turn-sync-function-to-async-python-tips-58nn
1059def debug_trace(browser: bool = True): 1060 """ 1061 Open a web-based debugger to trace the execution of the program. 1062 1063 This is an alias import for `meerschaum.utils.debug.debug_trace`. 1064 """ 1065 from meerschaum.utils.debug import trace 1066 trace(browser=browser)
Open a web-based debugger to trace the execution of the program.
This is an alias import for meerschaum.utils.debug.debug_trace
.
1069def items_str( 1070 items: List[Any], 1071 quotes: bool = True, 1072 quote_str: str = "'", 1073 commas: bool = True, 1074 comma_str: str = ',', 1075 and_: bool = True, 1076 and_str: str = 'and', 1077 oxford_comma: bool = True, 1078 spaces: bool = True, 1079 space_str = ' ', 1080) -> str: 1081 """ 1082 Return a formatted string if list items separated by commas. 1083 1084 Parameters 1085 ---------- 1086 items: [List[Any]] 1087 The items to be printed as an English list. 1088 1089 quotes: bool, default True 1090 If `True`, wrap items in quotes. 1091 1092 quote_str: str, default "'" 1093 If `quotes` is `True`, prepend and append each item with this string. 1094 1095 and_: bool, default True 1096 If `True`, include the word 'and' before the final item in the list. 1097 1098 and_str: str, default 'and' 1099 If `and_` is True, insert this string where 'and' normally would in and English list. 1100 1101 oxford_comma: bool, default True 1102 If `True`, include the Oxford Comma (comma before the final 'and'). 1103 Only applies when `and_` is `True`. 1104 1105 spaces: bool, default True 1106 If `True`, separate items with `space_str` 1107 1108 space_str: str, default ' ' 1109 If `spaces` is `True`, separate items with this string. 1110 1111 Returns 1112 ------- 1113 A string of the items as an English list. 1114 1115 Examples 1116 -------- 1117 >>> items_str([1,2,3]) 1118 "'1', '2', and '3'" 1119 >>> items_str([1,2,3], quotes=False) 1120 '1, 2, and 3' 1121 >>> items_str([1,2,3], and_=False) 1122 "'1', '2', '3'" 1123 >>> items_str([1,2,3], spaces=False, and_=False) 1124 "'1','2','3'" 1125 >>> items_str([1,2,3], oxford_comma=False) 1126 "'1', '2' and '3'" 1127 >>> items_str([1,2,3], quote_str=":") 1128 ':1:, :2:, and :3:' 1129 >>> items_str([1,2,3], and_str="or") 1130 "'1', '2', or '3'" 1131 >>> items_str([1,2,3], space_str="_") 1132 "'1',_'2',_and_'3'" 1133 1134 """ 1135 if not items: 1136 return '' 1137 1138 q = quote_str if quotes else '' 1139 s = space_str if spaces else '' 1140 a = and_str if and_ else '' 1141 c = comma_str if commas else '' 1142 1143 if len(items) == 1: 1144 return q + str(list(items)[0]) + q 1145 1146 if len(items) == 2: 1147 return q + str(list(items)[0]) + q + s + a + s + q + str(list(items)[1]) + q 1148 1149 sep = q + c + s + q 1150 output = q + sep.join(str(i) for i in items[:-1]) + q 1151 if oxford_comma: 1152 output += c 1153 output += s + a + (s if and_ else '') + q + str(items[-1]) + q 1154 return output
Return a formatted string if list items separated by commas.
Parameters
- items ([List[Any]]): The items to be printed as an English list.
- quotes (bool, default True):
If
True
, wrap items in quotes. - quote_str (str, default "'"):
If
quotes
isTrue
, prepend and append each item with this string. - and_ (bool, default True):
If
True
, include the word 'and' before the final item in the list. - and_str (str, default 'and'):
If
and_
is True, insert this string where 'and' normally would in and English list. - oxford_comma (bool, default True):
If
True
, include the Oxford Comma (comma before the final 'and'). Only applies whenand_
isTrue
. - spaces (bool, default True):
If
True
, separate items withspace_str
- space_str (str, default ' '):
If
spaces
isTrue
, separate items with this string.
Returns
- A string of the items as an English list.
Examples
>>> items_str([1,2,3])
"'1', '2', and '3'"
>>> items_str([1,2,3], quotes=False)
'1, 2, and 3'
>>> items_str([1,2,3], and_=False)
"'1', '2', '3'"
>>> items_str([1,2,3], spaces=False, and_=False)
"'1','2','3'"
>>> items_str([1,2,3], oxford_comma=False)
"'1', '2' and '3'"
>>> items_str([1,2,3], quote_str=":")
':1:, :2:, and :3:'
>>> items_str([1,2,3], and_str="or")
"'1', '2', or '3'"
>>> items_str([1,2,3], space_str="_")
"'1',_'2',_and_'3'"
1157def interval_str(delta: Union[timedelta, int]) -> str: 1158 """ 1159 Return a human-readable string for a `timedelta` (or `int` minutes). 1160 1161 Parameters 1162 ---------- 1163 delta: Union[timedelta, int] 1164 The interval to print. If `delta` is an integer, assume it corresponds to minutes. 1165 1166 Returns 1167 ------- 1168 A formatted string, fit for human eyes. 1169 """ 1170 from meerschaum.utils.packages import attempt_import 1171 if is_int(delta): 1172 return str(delta) 1173 humanfriendly = attempt_import('humanfriendly') 1174 delta_seconds = ( 1175 delta.total_seconds() 1176 if isinstance(delta, timedelta) 1177 else (delta * 60) 1178 ) 1179 return humanfriendly.format_timespan(delta_seconds)
Return a human-readable string for a timedelta
(or int
minutes).
Parameters
- delta (Union[timedelta, int]):
The interval to print. If
delta
is an integer, assume it corresponds to minutes.
Returns
- A formatted string, fit for human eyes.
1182def is_docker_available() -> bool: 1183 """Check if we can connect to the Docker engine.""" 1184 import subprocess 1185 try: 1186 has_docker = subprocess.call( 1187 ['docker', 'ps'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1188 ) == 0 1189 except Exception: 1190 has_docker = False 1191 return has_docker
Check if we can connect to the Docker engine.
1194def is_android() -> bool: 1195 """Return `True` if the current platform is Android.""" 1196 import sys 1197 return hasattr(sys, 'getandroidapilevel')
Return True
if the current platform is Android.
1200def is_bcp_available() -> bool: 1201 """Check if the MSSQL `bcp` utility is installed.""" 1202 import subprocess 1203 1204 try: 1205 has_bcp = subprocess.call( 1206 ['bcp', '-v'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT 1207 ) == 0 1208 except Exception: 1209 has_bcp = False 1210 return has_bcp
Check if the MSSQL bcp
utility is installed.
1213def is_systemd_available() -> bool: 1214 """Check if running on systemd.""" 1215 import subprocess 1216 try: 1217 has_systemctl = subprocess.call( 1218 ['systemctl', '-h'], 1219 stdout=subprocess.DEVNULL, 1220 stderr=subprocess.STDOUT, 1221 ) == 0 1222 except Exception: 1223 has_systemctl = False 1224 return has_systemctl
Check if running on systemd.
1227def is_tmux_available() -> bool: 1228 """ 1229 Check if `tmux` is installed. 1230 """ 1231 import subprocess 1232 try: 1233 has_tmux = subprocess.call( 1234 ['tmux', '-V'], 1235 stdout=subprocess.DEVNULL, 1236 stderr=subprocess.STDOUT 1237 ) == 0 1238 except Exception: 1239 has_tmux = False 1240 return has_tmux
Check if tmux
is installed.
1242def get_last_n_lines(file_name: str, N: int): 1243 """ 1244 https://thispointer.com/python-get-last-n-lines-of-a-text-file-like-tail-command/ 1245 """ 1246 import os 1247 # Create an empty list to keep the track of last N lines 1248 list_of_lines = [] 1249 # Open file for reading in binary mode 1250 with open(file_name, 'rb') as read_obj: 1251 # Move the cursor to the end of the file 1252 read_obj.seek(0, os.SEEK_END) 1253 # Create a buffer to keep the last read line 1254 buffer = bytearray() 1255 # Get the current position of pointer i.e eof 1256 pointer_location = read_obj.tell() 1257 # Loop till pointer reaches the top of the file 1258 while pointer_location >= 0: 1259 # Move the file pointer to the location pointed by pointer_location 1260 read_obj.seek(pointer_location) 1261 # Shift pointer location by -1 1262 pointer_location = pointer_location -1 1263 # read that byte / character 1264 new_byte = read_obj.read(1) 1265 # If the read byte is new line character then it means one line is read 1266 if new_byte == b'\n': 1267 # Save the line in list of lines 1268 list_of_lines.append(buffer.decode()[::-1]) 1269 # If the size of list reaches N, then return the reversed list 1270 if len(list_of_lines) == N: 1271 return list(reversed(list_of_lines)) 1272 # Reinitialize the byte array to save next line 1273 buffer = bytearray() 1274 else: 1275 # If last read character is not eol then add it in buffer 1276 buffer.extend(new_byte) 1277 # As file is read completely, if there is still data in buffer, then its first line. 1278 if len(buffer) > 0: 1279 list_of_lines.append(buffer.decode()[::-1]) 1280 # return the reversed list 1281 return list(reversed(list_of_lines))
1284def tail(f, n, offset=None): 1285 """ 1286 https://stackoverflow.com/a/692616/9699829 1287 1288 Reads n lines from f with an offset of offset lines. The return 1289 value is a tuple in the form ``(lines, has_more)`` where `has_more` is 1290 an indicator that is `True` if there are more lines in the file. 1291 """ 1292 avg_line_length = 74 1293 to_read = n + (offset or 0) 1294 1295 while True: 1296 try: 1297 f.seek(-(avg_line_length * to_read), 2) 1298 except IOError: 1299 # woops. apparently file is smaller than what we want 1300 # to step back, go to the beginning instead 1301 f.seek(0) 1302 pos = f.tell() 1303 lines = f.read().splitlines() 1304 if len(lines) >= to_read or pos == 0: 1305 return lines[-to_read:offset and -offset or None], \ 1306 len(lines) > to_read or pos > 0 1307 avg_line_length *= 1.3
https://stackoverflow.com/a/692616/9699829
Reads n lines from f with an offset of offset lines. The return
value is a tuple in the form (lines, has_more)
where has_more
is
an indicator that is True
if there are more lines in the file.
1310def truncate_string_sections(item: str, delimeter: str = '_', max_len: int = 128) -> str: 1311 """ 1312 Remove characters from each section of a string until the length is within the limit. 1313 1314 Parameters 1315 ---------- 1316 item: str 1317 The item name to be truncated. 1318 1319 delimeter: str, default '_' 1320 Split `item` by this string into several sections. 1321 1322 max_len: int, default 128 1323 The max acceptable length of the truncated version of `item`. 1324 1325 Returns 1326 ------- 1327 The truncated string. 1328 1329 Examples 1330 -------- 1331 >>> truncate_string_sections('abc_def_ghi', max_len=10) 1332 'ab_de_gh' 1333 1334 """ 1335 if len(item) < max_len: 1336 return item 1337 1338 def _shorten(s: str) -> str: 1339 return s[:-1] if len(s) > 1 else s 1340 1341 sections = list(enumerate(item.split('_'))) 1342 sorted_sections = sorted(sections, key=lambda x: (-1 * len(x[1]))) 1343 available_chars = max_len - len(sections) 1344 1345 _sections = [(i, s) for i, s in sorted_sections] 1346 _sections_len = sum([len(s) for i, s in _sections]) 1347 _old_sections_len = _sections_len 1348 while _sections_len > available_chars: 1349 _sections = [(i, _shorten(s)) for i, s in _sections] 1350 _old_sections_len = _sections_len 1351 _sections_len = sum([len(s) for i, s in _sections]) 1352 if _old_sections_len == _sections_len: 1353 raise Exception(f"String could not be truncated: '{item}'") 1354 1355 new_sections = sorted(_sections, key=lambda x: x[0]) 1356 return delimeter.join([s for i, s in new_sections])
Remove characters from each section of a string until the length is within the limit.
Parameters
- item (str): The item name to be truncated.
- delimeter (str, default '_'):
Split
item
by this string into several sections. - max_len (int, default 128):
The max acceptable length of the truncated version of
item
.
Returns
- The truncated string.
Examples
>>> truncate_string_sections('abc_def_ghi', max_len=10)
'ab_de_gh'
1359def separate_negation_values( 1360 vals: Union[List[str], Tuple[str]], 1361 negation_prefix: Optional[str] = None, 1362) -> Tuple[List[str], List[str]]: 1363 """ 1364 Separate the negated values from the positive ones. 1365 Return two lists: positive and negative values. 1366 1367 Parameters 1368 ---------- 1369 vals: Union[List[str], Tuple[str]] 1370 A list of strings to parse. 1371 1372 negation_prefix: Optional[str], default None 1373 Include values that start with this string in the second list. 1374 If `None`, use the system default (`_`). 1375 """ 1376 if negation_prefix is None: 1377 from meerschaum.config.static import STATIC_CONFIG 1378 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1379 _in_vals, _ex_vals = [], [] 1380 for v in vals: 1381 if str(v).startswith(negation_prefix): 1382 _ex_vals.append(str(v)[len(negation_prefix):]) 1383 else: 1384 _in_vals.append(v) 1385 1386 return _in_vals, _ex_vals
Separate the negated values from the positive ones. Return two lists: positive and negative values.
Parameters
- vals (Union[List[str], Tuple[str]]): A list of strings to parse.
- negation_prefix (Optional[str], default None):
Include values that start with this string in the second list.
If
None
, use the system default (_
).
1389def get_in_ex_params(params: Optional[Dict[str, Any]]) -> Dict[str, Tuple[List[Any], List[Any]]]: 1390 """ 1391 Translate a params dictionary into lists of include- and exclude-values. 1392 1393 Parameters 1394 ---------- 1395 params: Optional[Dict[str, Any]] 1396 A params query dictionary. 1397 1398 Returns 1399 ------- 1400 A dictionary mapping keys to a tuple of lists for include and exclude values. 1401 1402 Examples 1403 -------- 1404 >>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']}) 1405 {'a': (['b', 'c', 'e'], ['d', 'f'])} 1406 """ 1407 if not params: 1408 return {} 1409 return { 1410 col: separate_negation_values( 1411 ( 1412 val 1413 if isinstance(val, (list, tuple)) 1414 else [val] 1415 ) 1416 ) 1417 for col, val in params.items() 1418 }
Translate a params dictionary into lists of include- and exclude-values.
Parameters
- params (Optional[Dict[str, Any]]): A params query dictionary.
Returns
- A dictionary mapping keys to a tuple of lists for include and exclude values.
Examples
>>> get_in_ex_params({'a': ['b', 'c', '_d', 'e', '_f']})
{'a': (['b', 'c', 'e'], ['d', 'f'])}
1421def flatten_list(list_: List[Any]) -> List[Any]: 1422 """ 1423 Recursively flatten a list. 1424 """ 1425 for item in list_: 1426 if isinstance(item, list): 1427 yield from flatten_list(item) 1428 else: 1429 yield item
Recursively flatten a list.
1432def parse_arguments_str(args_str: str) -> Tuple[Tuple[Any], Dict[str, Any]]: 1433 """ 1434 Parse a string containing the text to be passed into a function 1435 and return a tuple of args, kwargs. 1436 1437 Parameters 1438 ---------- 1439 args_str: str 1440 The contents of the function parameter (as a string). 1441 1442 Returns 1443 ------- 1444 A tuple of args (tuple) and kwargs (dict[str, Any]). 1445 1446 Examples 1447 -------- 1448 >>> parse_arguments_str('123, 456, foo=789, bar="baz"') 1449 (123, 456), {'foo': 789, 'bar': 'baz'} 1450 """ 1451 import ast 1452 args = [] 1453 kwargs = {} 1454 1455 for part in args_str.split(','): 1456 if '=' in part: 1457 key, val = part.split('=', 1) 1458 kwargs[key.strip()] = ast.literal_eval(val) 1459 else: 1460 args.append(ast.literal_eval(part.strip())) 1461 1462 return tuple(args), kwargs
Parse a string containing the text to be passed into a function and return a tuple of args, kwargs.
Parameters
- args_str (str): The contents of the function parameter (as a string).
Returns
- A tuple of args (tuple) and kwargs (dict[str, Any]).
Examples
>>> parse_arguments_str('123, 456, foo=789, bar="baz"')
(123, 456), {'foo': 789, 'bar': 'baz'}
1465def make_symlink(src_path: 'pathlib.Path', dest_path: 'pathlib.Path') -> SuccessTuple: 1466 """ 1467 Wrap around `pathlib.Path.symlink_to`, but add support for Windows. 1468 1469 Parameters 1470 ---------- 1471 src_path: pathlib.Path 1472 The source path. 1473 1474 dest_path: pathlib.Path 1475 The destination path. 1476 1477 Returns 1478 ------- 1479 A SuccessTuple indicating success. 1480 """ 1481 if dest_path.exists() and dest_path.resolve() == src_path.resolve(): 1482 return True, "Symlink already exists." 1483 try: 1484 dest_path.symlink_to(src_path) 1485 success = True 1486 except Exception as e: 1487 success = False 1488 msg = str(e) 1489 if success: 1490 return success, "Success" 1491 1492 ### Failed to create a symlink. 1493 ### If we're not on Windows, return an error. 1494 import platform 1495 if platform.system() != 'Windows': 1496 return success, msg 1497 1498 try: 1499 import _winapi 1500 except ImportError: 1501 return False, "Unable to import _winapi." 1502 1503 if src_path.is_dir(): 1504 try: 1505 _winapi.CreateJunction(str(src_path), str(dest_path)) 1506 except Exception as e: 1507 return False, str(e) 1508 return True, "Success" 1509 1510 ### Last resort: copy the file on Windows. 1511 import shutil 1512 try: 1513 shutil.copy(src_path, dest_path) 1514 except Exception as e: 1515 return False, str(e) 1516 1517 return True, "Success"
Wrap around pathlib.Path.symlink_to
, but add support for Windows.
Parameters
- src_path (pathlib.Path): The source path.
- dest_path (pathlib.Path): The destination path.
Returns
- A SuccessTuple indicating success.
1520def is_symlink(path: pathlib.Path) -> bool: 1521 """ 1522 Wrap `path.is_symlink()` but add support for Windows junctions. 1523 """ 1524 if path.is_symlink(): 1525 return True 1526 import platform 1527 import os 1528 if platform.system() != 'Windows': 1529 return False 1530 try: 1531 return bool(os.readlink(path)) 1532 except OSError: 1533 return False
Wrap path.is_symlink()
but add support for Windows junctions.
1536def parametrized(dec): 1537 """ 1538 A meta-decorator for allowing other decorator functions to have parameters. 1539 1540 https://stackoverflow.com/a/26151604/9699829 1541 """ 1542 def layer(*args, **kwargs): 1543 def repl(f): 1544 return dec(f, *args, **kwargs) 1545 return repl 1546 return layer
A meta-decorator for allowing other decorator functions to have parameters.
1549def safely_extract_tar(tarf: 'file', output_dir: Union[str, 'pathlib.Path']) -> None: 1550 """ 1551 Safely extract a TAR file to a give directory. 1552 This defends against CVE-2007-4559. 1553 1554 Parameters 1555 ---------- 1556 tarf: file 1557 The TAR file opened with `tarfile.open(path, 'r:gz')`. 1558 1559 output_dir: Union[str, pathlib.Path] 1560 The output directory. 1561 """ 1562 import os 1563 1564 def is_within_directory(directory, target): 1565 abs_directory = os.path.abspath(directory) 1566 abs_target = os.path.abspath(target) 1567 prefix = os.path.commonprefix([abs_directory, abs_target]) 1568 return prefix == abs_directory 1569 1570 def safe_extract(tar, path=".", members=None, *, numeric_owner=False): 1571 for member in tar.getmembers(): 1572 member_path = os.path.join(path, member.name) 1573 if not is_within_directory(path, member_path): 1574 raise Exception("Attempted Path Traversal in Tar File") 1575 1576 tar.extractall(path=path, members=members, numeric_owner=numeric_owner) 1577 1578 return safe_extract(tarf, output_dir)
Safely extract a TAR file to a give directory. This defends against CVE-2007-4559.
Parameters
- tarf (file):
The TAR file opened with
tarfile.open(path, 'r:gz')
. - output_dir (Union[str, pathlib.Path]): The output directory.
1585def choose_subaction(*args, **kwargs) -> Any: 1586 """ 1587 Placeholder function to prevent breaking legacy behavior. 1588 See `meerschaum.actions.choose_subaction`. 1589 """ 1590 from meerschaum.actions import choose_subaction as _choose_subactions 1591 return _choose_subactions(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.actions.choose_subaction
.
1594def print_options(*args, **kwargs) -> None: 1595 """ 1596 Placeholder function to prevent breaking legacy behavior. 1597 See `meerschaum.utils.formatting.print_options`. 1598 """ 1599 from meerschaum.utils.formatting import print_options as _print_options 1600 return _print_options(*args, **kwargs)
Placeholder function to prevent breaking legacy behavior.
See meerschaum.utils.formatting.print_options
.
1711def json_serialize_datetime(dt: datetime) -> Union[str, None]: 1712 """ 1713 Serialize a datetime object into JSON (ISO format string). 1714 1715 Examples 1716 -------- 1717 >>> import json 1718 >>> from datetime import datetime 1719 >>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime) 1720 '{"a": "2022-01-01T00:00:00Z"}' 1721 1722 """ 1723 from meerschaum.utils.dtypes import serialize_datetime 1724 return serialize_datetime(dt)
Serialize a datetime object into JSON (ISO format string).
Examples
>>> import json
>>> from datetime import datetime
>>> json.dumps({'a': datetime(2022, 1, 1)}, default=json_serialize_datetime)
'{"a": "2022-01-01T00:00:00Z"}'