meerschaum.utils.formatting
Utilities for formatting output text
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utilities for formatting output text 7""" 8 9from __future__ import annotations 10import platform 11import os 12import sys 13import meerschaum as mrsm 14from meerschaum.utils.typing import Optional, Union, Any, Dict, Iterable 15from meerschaum.utils.formatting._shell import make_header 16from meerschaum.utils.formatting._pprint import pprint 17from meerschaum.utils.formatting._pipes import ( 18 pprint_pipes, 19 highlight_pipes, 20 format_pipe_success_tuple, 21 print_pipes_results, 22 extract_stats_from_message, 23 pipe_repr, 24) 25from meerschaum.utils.threading import Lock, RLock 26 27_attrs = { 28 'ANSI': None, 29 'UNICODE': None, 30 'CHARSET': None, 31 'RESET': '\033[0m', 32} 33__all__ = sorted([ 34 'ANSI', 'CHARSET', 'UNICODE', 'RESET', 35 'colored', 36 'translate_rich_to_termcolor', 37 'get_console', 38 'format_success_tuple', 39 'print_tuple', 40 'print_options', 41 'fill_ansi', 42 'pprint', 43 'highlight_pipes', 44 'pprint_pipes', 45 'make_header', 46 'pipe_repr', 47 'print_pipes_results', 48 'extract_stats_from_message', 49]) 50__pdoc__ = {} 51_locks = { 52 '_colorama_init': RLock(), 53} 54 55 56def colored_fallback(*args, **kw): 57 return ' '.join(args) 58 59def translate_rich_to_termcolor(*colors) -> tuple: 60 """Translate between rich and more_termcolor terminology.""" 61 _colors = [] 62 for c in colors: 63 _c_list = [] 64 ### handle 'bright' 65 c = c.replace('bright_', 'bright ') 66 67 ### handle 'on' 68 if ' on ' in c: 69 _on = c.split(' on ') 70 _colors.append(_on[0]) 71 for _c in _on[1:]: 72 _c_list.append('on ' + _c) 73 else: 74 _c_list += [c] 75 76 _colors += _c_list 77 78 return tuple(_colors) 79 80 81def rich_text_to_str(text: 'rich.text.Text') -> str: 82 """Convert a `rich.text.Text` object to a string with ANSI in-tact.""" 83 _console = get_console() 84 if _console is None: 85 return str(text) 86 with console.capture() as cap: 87 console.print(text) 88 string = cap.get() 89 return string[:-1] 90 91 92def _init(): 93 """ 94 Initial color settings (mostly for Windows). 95 """ 96 if platform.system() != "Windows": 97 return 98 if 'PYTHONIOENCODING' not in os.environ: 99 os.environ['PYTHONIOENCODING'] = 'utf-8' 100 if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ: 101 os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8' 102 sys.stdin.reconfigure(encoding='utf-8') 103 sys.stdout.reconfigure(encoding='utf-8') 104 sys.stderr.reconfigure(encoding='utf-8') 105 106 from ctypes import windll 107 k = windll.kernel32 108 k.SetConsoleMode(k.GetStdHandle(-11), 7) 109 os.system("color") 110 111 from meerschaum.utils.packages import attempt_import 112 ### init colorama for Windows color output 113 colorama, more_termcolor = attempt_import( 114 'colorama', 115 'more_termcolor', 116 lazy = False, 117 warn = False, 118 color = False, 119 ) 120 try: 121 colorama.init(autoreset=False) 122 success = True 123 except Exception: 124 import traceback 125 traceback.print_exc() 126 _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii' 127 success = False 128 129 if more_termcolor is None: 130 _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii' 131 success = False 132 133 return success 134 135_colorama_init = False 136def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']: 137 """Apply colors and rich styles to a string. 138 If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string. 139 Otherwise attempt to use the legacy `more_termcolor.colored` method. 140 141 Parameters 142 ---------- 143 text: str 144 The string to apply formatting to. 145 146 *colors: 147 A list of colors to pass to `more_termcolor.colored()`. 148 Has no effect if `style` is provided. 149 150 style: str, default None 151 If provided, pass to `rich` for processing. 152 153 as_rich_text: bool, default False 154 If `True`, return a `rich.Text` object. 155 `style` must be provided. 156 157 **kw: 158 Keyword arguments to pass to `rich.text.Text` or `more_termcolor`. 159 160 161 Returns 162 ------- 163 An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`. 164 165 """ 166 from meerschaum.utils.packages import import_rich, attempt_import 167 global _colorama_init 168 _init() 169 with _locks['_colorama_init']: 170 if not _colorama_init: 171 _colorama_init = _init() 172 173 if 'style' in kw: 174 rich = import_rich() 175 rich_text = attempt_import('rich.text') 176 text_obj = rich_text.Text(text, **kw) 177 if as_rich_text: 178 return text_obj 179 return rich_text_to_str(text_obj) 180 181 more_termcolor = attempt_import('more_termcolor', lazy=False) 182 try: 183 colored_text = more_termcolor.colored(text, *colors, **kw) 184 except Exception as e: 185 colored_text = None 186 187 if colored_text is not None: 188 return colored_text 189 190 try: 191 _colors = translate_rich_to_termcolor(*colors) 192 colored_text = more_termcolor.colored(text, *_colors, **kw) 193 except Exception as e: 194 colored_text = None 195 196 if colored_text is None: 197 ### NOTE: warn here? 198 return text 199 200 return colored_text 201 202console = None 203def get_console(): 204 """Get the rich console.""" 205 global console 206 from meerschaum.utils.packages import import_rich, attempt_import 207 rich = import_rich() 208 rich_console = attempt_import('rich.console') 209 try: 210 console = rich_console.Console(force_terminal=True, color_system='truecolor') 211 except Exception: 212 import traceback 213 traceback.print_exc() 214 console = None 215 return console 216 217 218def print_tuple( 219 tup: mrsm.SuccessTuple, 220 skip_common: bool = True, 221 common_only: bool = False, 222 upper_padding: int = 1, 223 lower_padding: int = 1, 224 left_padding: int = 1, 225 calm: bool = False, 226 _progress: Optional['rich.progress.Progress'] = None, 227) -> None: 228 """ 229 Format `meerschaum.utils.typing.SuccessTuple`. 230 231 Parameters 232 ---------- 233 skip_common: bool, default True 234 If `True`, do not print common success tuples (i.e. `(True, "Success")`). 235 236 common_only: bool, default False 237 If `True`, only print if the success tuple is common. 238 239 upper_padding: int, default 0 240 How many newlines to prepend to the message. 241 242 lower_padding: int, default 0 243 How many newlines to append to the message. 244 245 left_padding: int, default 1 246 How mant spaces to preprend to the message. 247 248 calm: bool, default False 249 If `True`, use the default emoji and color scheme. 250 251 """ 252 from meerschaum._internal.static import STATIC_CONFIG 253 do_print = True 254 255 omit_messages = STATIC_CONFIG['system']['success']['ignore'] 256 257 if common_only: 258 skip_common = False 259 do_print = tup[1].strip() in omit_messages 260 261 if skip_common: 262 do_print = tup[1].strip() not in omit_messages 263 264 if not do_print: 265 return 266 267 print(format_success_tuple( 268 tup, 269 upper_padding=upper_padding, 270 lower_padding=lower_padding, 271 calm=calm, 272 _progress=_progress, 273 )) 274 275 276def format_success_tuple( 277 tup: mrsm.SuccessTuple, 278 upper_padding: int = 0, 279 lower_padding: int = 0, 280 left_padding: int = 1, 281 calm: bool = False, 282 _progress: Optional['rich.progress.Progress'] = None, 283) -> str: 284 """ 285 Format `meerschaum.utils.typing.SuccessTuple`. 286 287 Parameters 288 ---------- 289 upper_padding: int, default 0 290 How many newlines to prepend to the message. 291 292 lower_padding: int, default 0 293 How many newlines to append to the message. 294 295 left_padding: int, default 1 296 How mant spaces to preprend to the message. 297 298 calm: bool, default False 299 If `True`, use the default emoji and color scheme. 300 """ 301 _init() 302 try: 303 status = 'success' if tup[0] else 'failure' 304 except TypeError: 305 status = 'failure' 306 tup = None, None 307 308 if calm: 309 status += '_calm' 310 311 ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET') 312 from meerschaum.config import get_config 313 status_config = get_config('formatting', status, patch=True) 314 315 msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1])) 316 lines = msg.split('\n') 317 lines = [lines[0]] + [ 318 ((' ' + line if not line.startswith(' ') else line)) 319 for line in lines[1:] 320 ] 321 if ANSI: 322 lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich']) 323 324 msg = '\n'.join(lines) 325 msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding) 326 return msg 327 328 329def print_options( 330 options: Optional[Iterable[Any]] = None, 331 nopretty: bool = False, 332 no_rich: bool = False, 333 name: str = 'options', 334 header: Optional[str] = None, 335 num_cols: Optional[int] = None, 336 adjust_cols: bool = True, 337 sort_options: bool = False, 338 number_options: bool = False, 339 **kw 340) -> None: 341 """ 342 Print items in an iterable as a fancy table. 343 344 Parameters 345 ---------- 346 options: Optional[Dict[str, Any]], default None 347 The iterable to be printed. 348 349 nopretty: bool, default False 350 If `True`, don't use fancy formatting. 351 352 no_rich: bool, default False 353 If `True`, don't use `rich` to format the output. 354 355 name: str, default 'options' 356 The text in the default header after `'Available'`. 357 358 header: Optional[str], default None 359 If provided, override `name` and use this as the header text. 360 361 num_cols: Optional[int], default None 362 How many columns in the table. Depends on the terminal size. If `None`, use 8. 363 364 adjust_cols: bool, default True 365 If `True`, adjust the number of columns depending on the terminal size. 366 367 sort_options: bool, default False 368 If `True`, print the options in sorted order. 369 370 number_options: bool, default False 371 If `True`, print the option's number in the list (1 index). 372 373 """ 374 from meerschaum.utils.packages import import_rich 375 from meerschaum.utils.formatting import highlight_pipes 376 from meerschaum.utils.misc import get_cols_lines, string_width 377 378 if options is None: 379 options = {} 380 _options = [] 381 for o in options: 382 _options.append(str(o)) 383 if sort_options: 384 _options = sorted(_options) 385 _header = f"\nAvailable {name}" if header is None else header 386 387 if num_cols is None: 388 num_cols = 8 389 390 def _print_options_no_rich(): 391 if not nopretty: 392 print() 393 print(make_header(_header)) 394 ### print actions 395 for i, option in enumerate(_options): 396 marker = '-' if not number_options else (str(i + 1) + '.') 397 if not nopretty: 398 print(f" {marker} ", end="") 399 print(option) 400 if not nopretty: 401 print() 402 403 rich = import_rich() 404 if rich is None or nopretty or no_rich: 405 _print_options_no_rich() 406 return None 407 408 ### Prevent too many options from being truncated on small terminals. 409 if adjust_cols and _options: 410 _cols, _lines = get_cols_lines() 411 while num_cols > 1: 412 cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols) 413 num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options]) 414 if num_too_big > int(len(_options) / 3): 415 num_cols -= 1 416 continue 417 break 418 419 from meerschaum.utils.packages import attempt_import 420 rich_table = attempt_import('rich.table') 421 Text = attempt_import('rich.text').Text 422 box = attempt_import('rich.box') 423 Table = rich_table.Table 424 425 if _header is not None: 426 table = Table( 427 title=_header, 428 box=box.SIMPLE, 429 show_header=False, 430 show_footer=False, 431 title_style='', 432 expand = True, 433 ) 434 else: 435 table = Table.grid(padding=(0, 2)) 436 for i in range(num_cols): 437 table.add_column() 438 439 if len(_options) < 12: 440 ### If fewer than 12 items, use a single column 441 for i, option in enumerate(_options): 442 item = highlight_pipes(option) 443 if number_options: 444 item = str(i + 1) + '. ' + item 445 table.add_row(Text.from_ansi(item)) 446 else: 447 ### Otherwise, use multiple columns as before 448 num_rows = (len(_options) + num_cols - 1) // num_cols 449 item_ix = 0 450 for i in range(num_rows): 451 row = [] 452 for j in range(num_cols): 453 index = i + j * num_rows 454 if index < len(_options): 455 item = highlight_pipes(_options[index]) 456 if number_options: 457 item = str(i + 1) + '. ' + item 458 row.append(Text.from_ansi(item)) 459 item_ix += 1 460 else: 461 row.append('') 462 table.add_row(*row) 463 464 get_console().print(table) 465 return None 466 467 468def fill_ansi(string: str, style: str = '') -> str: 469 """ 470 Fill in non-formatted segments of ANSI text. 471 472 Parameters 473 ---------- 474 string: str 475 A string which contains ANSI escape codes. 476 477 style: str 478 Style arguments to pass to `rich.text.Text`. 479 480 Returns 481 ------- 482 A string with ANSI styling applied to the segments which don't yet have a style applied. 483 """ 484 from meerschaum.utils.packages import import_rich, attempt_import 485 from meerschaum.utils.misc import iterate_chunks 486 _ = import_rich() 487 rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text') 488 Text = rich_text.Text 489 try: 490 msg = Text.from_ansi(string) 491 except AttributeError: 492 import traceback 493 traceback.print_exc() 494 msg = '' 495 496 plain_indices = [] 497 for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)): 498 left = left_span.end 499 right = right_span.start if not isinstance(right_span, int) else right_span 500 if left != right: 501 plain_indices.append((left, right)) 502 if msg.spans: 503 if msg.spans[0].start != 0: 504 plain_indices = [(0, msg.spans[0].start)] + plain_indices 505 if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg): 506 plain_indices.append((msg.spans[-1].end, len(msg))) 507 508 if plain_indices: 509 for left, right in plain_indices: 510 msg.stylize(style, left, right) 511 else: 512 msg = Text(str(msg), style) 513 514 return rich_text_to_str(msg) 515 516 517def __getattr__(name: str) -> str: 518 """ 519 Lazily load module-level variables. 520 """ 521 if name.startswith('__') and name.endswith('__'): 522 raise AttributeError("Cannot import dunders from this module.") 523 524 if name in _attrs: 525 if _attrs[name] is not None: 526 return _attrs[name] 527 from meerschaum.config import get_config 528 if name.lower() in get_config('formatting'): 529 _attrs[name] = get_config('formatting', name.lower()) 530 elif name == 'CHARSET': 531 _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii' 532 return _attrs[name] 533 534 if name == '__wrapped__': 535 import sys 536 return sys.modules[__name__] 537 if name == '__all__': 538 return __all__ 539 540 try: 541 return globals()[name] 542 except KeyError: 543 raise AttributeError(f"Could not find '{name}'")
137def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']: 138 """Apply colors and rich styles to a string. 139 If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string. 140 Otherwise attempt to use the legacy `more_termcolor.colored` method. 141 142 Parameters 143 ---------- 144 text: str 145 The string to apply formatting to. 146 147 *colors: 148 A list of colors to pass to `more_termcolor.colored()`. 149 Has no effect if `style` is provided. 150 151 style: str, default None 152 If provided, pass to `rich` for processing. 153 154 as_rich_text: bool, default False 155 If `True`, return a `rich.Text` object. 156 `style` must be provided. 157 158 **kw: 159 Keyword arguments to pass to `rich.text.Text` or `more_termcolor`. 160 161 162 Returns 163 ------- 164 An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`. 165 166 """ 167 from meerschaum.utils.packages import import_rich, attempt_import 168 global _colorama_init 169 _init() 170 with _locks['_colorama_init']: 171 if not _colorama_init: 172 _colorama_init = _init() 173 174 if 'style' in kw: 175 rich = import_rich() 176 rich_text = attempt_import('rich.text') 177 text_obj = rich_text.Text(text, **kw) 178 if as_rich_text: 179 return text_obj 180 return rich_text_to_str(text_obj) 181 182 more_termcolor = attempt_import('more_termcolor', lazy=False) 183 try: 184 colored_text = more_termcolor.colored(text, *colors, **kw) 185 except Exception as e: 186 colored_text = None 187 188 if colored_text is not None: 189 return colored_text 190 191 try: 192 _colors = translate_rich_to_termcolor(*colors) 193 colored_text = more_termcolor.colored(text, *_colors, **kw) 194 except Exception as e: 195 colored_text = None 196 197 if colored_text is None: 198 ### NOTE: warn here? 199 return text 200 201 return colored_text
Apply colors and rich styles to a string.
If a style
keyword is provided, a rich.text.Text
object will be parsed into a string.
Otherwise attempt to use the legacy more_termcolor.colored
method.
Parameters
- text (str): The string to apply formatting to.
- *colors:: A list of colors to pass to
more_termcolor.colored()
. Has no effect ifstyle
is provided. - style (str, default None):
If provided, pass to
rich
for processing. - as_rich_text (bool, default False):
If
True
, return arich.Text
object.style
must be provided. - **kw:: Keyword arguments to pass to
rich.text.Text
ormore_termcolor
.
Returns
- An ANSI-formatted string or a
rich.text.Text
object ifas_rich_text
isTrue
.
487def extract_stats_from_message( 488 message: str, 489 stat_keys: Optional[List[str]] = None, 490) -> Dict[str, int]: 491 """ 492 Given a sync message, return the insert, update, upsert stats from within. 493 494 Parameters 495 ---------- 496 message: str 497 The message to parse for statistics. 498 499 stat_keys: Optional[List[str]], default None 500 If provided, search for these words (case insensitive) in the message. 501 Defaults to `['inserted', 'updated', 'upserted']`. 502 503 Returns 504 ------- 505 A dictionary mapping the stat keys to the total number of rows affected. 506 """ 507 stat_keys = stat_keys or ['inserted', 'updated', 'upserted', 'checked'] 508 lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')] 509 message_stats = { 510 stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats) 511 for stat_key in stat_keys 512 } 513 return message_stats
Given a sync message, return the insert, update, upsert stats from within.
Parameters
- message (str): The message to parse for statistics.
- stat_keys (Optional[List[str]], default None):
If provided, search for these words (case insensitive) in the message.
Defaults to
['inserted', 'updated', 'upserted']
.
Returns
- A dictionary mapping the stat keys to the total number of rows affected.
469def fill_ansi(string: str, style: str = '') -> str: 470 """ 471 Fill in non-formatted segments of ANSI text. 472 473 Parameters 474 ---------- 475 string: str 476 A string which contains ANSI escape codes. 477 478 style: str 479 Style arguments to pass to `rich.text.Text`. 480 481 Returns 482 ------- 483 A string with ANSI styling applied to the segments which don't yet have a style applied. 484 """ 485 from meerschaum.utils.packages import import_rich, attempt_import 486 from meerschaum.utils.misc import iterate_chunks 487 _ = import_rich() 488 rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text') 489 Text = rich_text.Text 490 try: 491 msg = Text.from_ansi(string) 492 except AttributeError: 493 import traceback 494 traceback.print_exc() 495 msg = '' 496 497 plain_indices = [] 498 for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)): 499 left = left_span.end 500 right = right_span.start if not isinstance(right_span, int) else right_span 501 if left != right: 502 plain_indices.append((left, right)) 503 if msg.spans: 504 if msg.spans[0].start != 0: 505 plain_indices = [(0, msg.spans[0].start)] + plain_indices 506 if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg): 507 plain_indices.append((msg.spans[-1].end, len(msg))) 508 509 if plain_indices: 510 for left, right in plain_indices: 511 msg.stylize(style, left, right) 512 else: 513 msg = Text(str(msg), style) 514 515 return rich_text_to_str(msg)
Fill in non-formatted segments of ANSI text.
Parameters
- string (str): A string which contains ANSI escape codes.
- style (str):
Style arguments to pass to
rich.text.Text
.
Returns
- A string with ANSI styling applied to the segments which don't yet have a style applied.
277def format_success_tuple( 278 tup: mrsm.SuccessTuple, 279 upper_padding: int = 0, 280 lower_padding: int = 0, 281 left_padding: int = 1, 282 calm: bool = False, 283 _progress: Optional['rich.progress.Progress'] = None, 284) -> str: 285 """ 286 Format `meerschaum.utils.typing.SuccessTuple`. 287 288 Parameters 289 ---------- 290 upper_padding: int, default 0 291 How many newlines to prepend to the message. 292 293 lower_padding: int, default 0 294 How many newlines to append to the message. 295 296 left_padding: int, default 1 297 How mant spaces to preprend to the message. 298 299 calm: bool, default False 300 If `True`, use the default emoji and color scheme. 301 """ 302 _init() 303 try: 304 status = 'success' if tup[0] else 'failure' 305 except TypeError: 306 status = 'failure' 307 tup = None, None 308 309 if calm: 310 status += '_calm' 311 312 ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET') 313 from meerschaum.config import get_config 314 status_config = get_config('formatting', status, patch=True) 315 316 msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1])) 317 lines = msg.split('\n') 318 lines = [lines[0]] + [ 319 ((' ' + line if not line.startswith(' ') else line)) 320 for line in lines[1:] 321 ] 322 if ANSI: 323 lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich']) 324 325 msg = '\n'.join(lines) 326 msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding) 327 return msg
Format meerschaum.utils.typing.SuccessTuple
.
Parameters
- upper_padding (int, default 0): How many newlines to prepend to the message.
- lower_padding (int, default 0): How many newlines to append to the message.
- left_padding (int, default 1): How mant spaces to preprend to the message.
- calm (bool, default False):
If
True
, use the default emoji and color scheme.
204def get_console(): 205 """Get the rich console.""" 206 global console 207 from meerschaum.utils.packages import import_rich, attempt_import 208 rich = import_rich() 209 rich_console = attempt_import('rich.console') 210 try: 211 console = rich_console.Console(force_terminal=True, color_system='truecolor') 212 except Exception: 213 import traceback 214 traceback.print_exc() 215 console = None 216 return console
Get the rich console.
345def highlight_pipes(message: str) -> str: 346 """ 347 Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects. 348 """ 349 if 'Pipe(' not in message: 350 return message 351 352 from meerschaum.utils.misc import parse_arguments_str 353 segments = message.split('Pipe(') 354 msg = '' 355 for i, segment in enumerate(segments): 356 if i == 0: 357 msg += segment 358 continue 359 360 paren_index = segment.find(')') 361 if paren_index == -1: 362 msg += 'Pipe(' + segment 363 continue 364 365 pipe_args_str = segment[:paren_index] 366 try: 367 args, kwargs = parse_arguments_str(pipe_args_str) 368 pipe_dict = { 369 'connector_keys': args[0], 370 'metric_key': args[1], 371 } 372 if len(args) > 2: 373 pipe_dict['location_key'] = args[2] 374 if 'instance' in kwargs: 375 pipe_dict['instance_keys'] = kwargs['instance'] 376 377 _to_add = pipe_repr(pipe_dict) + segment[paren_index + 1:] 378 except Exception: 379 _to_add = 'Pipe(' + segment 380 msg += _to_add 381 return msg
Add syntax highlighting to an info message containing stringified meerschaum.Pipe
objects.
17def make_header(message: str, ruler: str = '─', left_pad: int = 2) -> str: 18 """Format a message string with a ruler. 19 Length of the ruler is the length of the longest word. 20 21 Example: 22 'My\nheader' -> ' My\n header\n ──────' 23 """ 24 25 from meerschaum.utils.formatting import ANSI, UNICODE, colored 26 if not UNICODE: 27 ruler = '-' 28 words = message.split('\n') 29 max_length = 0 30 for w in words: 31 length = len(w) 32 if length > max_length: 33 max_length = length 34 35 left_buffer = left_pad * ' ' 36 37 return ( 38 left_buffer 39 + message.replace('\n', '\n' + left_buffer) 40 + "\n" 41 + left_buffer 42 + (ruler * max_length) 43 )
Format a message string with a ruler. Length of the ruler is the length of the longest word.
Example:
'My
header' -> ' My header ──────'
280def pipe_repr( 281 pipe: Union[mrsm.Pipe, Dict[str, Any]], 282 as_rich_text: bool = False, 283 ansi: Optional[bool] = None, 284) -> Union[str, 'rich.text.Text']: 285 """ 286 Return a formatted string for representing a `meerschaum.Pipe`. 287 """ 288 from meerschaum.utils.formatting import ANSI, colored, rich_text_to_str 289 from meerschaum.utils.packages import import_rich, attempt_import 290 import meerschaum as mrsm 291 292 _ = import_rich() 293 Text = attempt_import('rich.text').Text 294 295 if isinstance(pipe, mrsm.Pipe): 296 connector_keys = pipe.connector_keys 297 metric_key = pipe.metric_key 298 location_key = pipe.location_key 299 instance_keys = pipe.instance_keys 300 else: 301 connector_keys = pipe.get('connector_keys') 302 metric_key = pipe.get('metric_key') 303 location_key = pipe.get('location_key') 304 instance_keys = pipe.get('instance_keys', get_config('meerschaum', 'instance')) 305 306 styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles') 307 if not ANSI or (ansi is False): 308 styles = {k: '' for k in styles} 309 _pipe_style_prefix, _pipe_style_suffix = ( 310 (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe'] 311 else ('', '') 312 ) 313 text_obj = ( 314 Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix) 315 + colored(("'" + connector_keys + "'"), style=styles['connector'], as_rich_text=True) 316 + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix) 317 + colored(("'" + metric_key + "'"), style=styles['metric'], as_rich_text=True) 318 + ( 319 ( 320 colored(', ', style=styles['punctuation'], as_rich_text=True) 321 + colored( 322 ("'" + location_key + "'"), 323 style=styles['location'], as_rich_text=True 324 ) 325 ) if location_key is not None 326 else colored('', style='', as_rich_text=True) 327 ) + ( 328 ( ### Add the `instance=` argument. 329 colored(', instance=', style=styles['punctuation'], as_rich_text=True) 330 + colored( 331 ("'" + instance_keys + "'"), 332 style=styles['instance'], as_rich_text=True 333 ) 334 ) if instance_keys != get_config('meerschaum', 'instance') 335 else colored('', style='', as_rich_text=True) 336 ) 337 + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix) 338 ) 339 if as_rich_text: 340 return text_obj 341 return rich_text_to_str(text_obj).replace('\n', '')
Return a formatted string for representing a meerschaum.Pipe
.
10def pprint( 11 *args, 12 detect_password: bool = True, 13 nopretty: bool = False, 14 **kw 15) -> None: 16 """Pretty print an object according to the configured ANSI and UNICODE settings. 17 If detect_password is True (default), search and replace passwords with '*' characters. 18 Does not mutate objects. 19 """ 20 import copy 21 import json 22 from meerschaum.utils.packages import attempt_import, import_rich 23 from meerschaum.utils.formatting import ANSI, get_console, print_tuple 24 from meerschaum.utils.warnings import error 25 from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords 26 from collections import OrderedDict 27 28 if ( 29 len(args) == 1 30 and 31 isinstance(args[0], tuple) 32 and 33 len(args[0]) == 2 34 and 35 isinstance(args[0][0], bool) 36 and 37 isinstance(args[0][1], str) 38 ): 39 return print_tuple(args[0], **filter_keywords(print_tuple, **kw)) 40 41 modify = True 42 rich_pprint = None 43 if ANSI and not nopretty: 44 rich = import_rich() 45 if rich is not None: 46 rich_pretty = attempt_import('rich.pretty') 47 if rich_pretty is not None: 48 def _rich_pprint(*args, **kw): 49 _console = get_console() 50 _kw = filter_keywords(_console.print, **kw) 51 _console.print(*args, **_kw) 52 rich_pprint = _rich_pprint 53 elif not nopretty: 54 pprintpp = attempt_import('pprintpp', warn=False) 55 try: 56 _pprint = pprintpp.pprint 57 except Exception : 58 import pprint as _pprint_module 59 _pprint = _pprint_module.pprint 60 61 func = ( 62 _pprint if rich_pprint is None else rich_pprint 63 ) if not nopretty else print 64 65 try: 66 args_copy = copy.deepcopy(args) 67 except Exception: 68 args_copy = args 69 modify = False 70 71 _args = [] 72 for a in args: 73 c = a 74 ### convert OrderedDict into dict 75 if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict): 76 c = dict_from_od(copy.deepcopy(c)) 77 _args.append(c) 78 args = _args 79 80 _args = list(args) 81 if detect_password and modify: 82 _args = [] 83 for a in args: 84 c = a 85 if isinstance(c, dict): 86 c = replace_password(copy.deepcopy(c)) 87 if nopretty: 88 try: 89 c = json.dumps(c) 90 is_json = True 91 except Exception: 92 is_json = False 93 if not is_json: 94 try: 95 c = str(c) 96 except Exception: 97 pass 98 _args.append(c) 99 100 ### filter out unsupported keywords 101 func_kw = filter_keywords(func, **kw) if not nopretty else {} 102 error_msg = None 103 try: 104 func(*_args, **func_kw) 105 except Exception as e: 106 error_msg = e 107 if error_msg is not None: 108 error(error_msg)
Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.
17def pprint_pipes(pipes: PipesDict) -> None: 18 """Print a stylized tree of a Pipes dictionary. 19 Supports ANSI and UNICODE global settings.""" 20 from meerschaum.utils.warnings import error 21 from meerschaum.utils.packages import attempt_import, import_rich 22 from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict 23 from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console 24 import copy 25 rich = import_rich('rich', warn=False) 26 Text = None 27 if rich is not None: 28 rich_text = attempt_import('rich.text', lazy=False) 29 Text = rich_text.Text 30 31 icons = get_config('formatting', 'pipes', CHARSET, 'icons') 32 styles = get_config('formatting', 'pipes', 'ansi', 'styles') 33 if not ANSI: 34 styles = {k: '' for k in styles} 35 print() 36 37 def ascii_print_pipes(): 38 """Print the dictionary with no unicode allowed. Also works in case rich fails to import 39 (though rich should auto-install when `attempt_import()` is called).""" 40 asciitree = attempt_import('asciitree') 41 ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}} 42 for conn_keys, metrics in pipes.items(): 43 _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector']) 44 if Text is not None: 45 replace_dict['connector'][_colored_conn_key] = ( 46 Text(conn_keys, style=styles['connector']) 47 ) 48 ascii_dict[_colored_conn_key] = {} 49 for metric, locations in metrics.items(): 50 _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric']) 51 if Text is not None: 52 replace_dict['metric'][_colored_metric_key] = ( 53 Text(metric, style=styles['metric']) 54 ) 55 ascii_dict[_colored_conn_key][_colored_metric_key] = {} 56 for location, pipe in locations.items(): 57 _location_style = styles[('none' if location is None else 'location')] 58 pipe_addendum = '\n ' + pipe.__repr__() + '\n' 59 _colored_location = colored( 60 icons['location'] + str(location), style=_location_style 61 ) 62 _colored_location_key = _colored_location + pipe_addendum 63 if Text is not None: 64 replace_dict['location'][_colored_location] = ( 65 Text(str(location), style=_location_style) 66 ) 67 ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {} 68 69 tree = asciitree.LeftAligned() 70 output = '' 71 cols = [] 72 73 ### This is pretty terrible, unreadable code. 74 ### Please know that I'm normally better than this. 75 key_str = ( 76 (Text(" ") if Text is not None else " ") + 77 ( 78 Text("Key", style='underline') if Text is not None else 79 colored("Key", style='underline') 80 ) + (Text('\n\n ') if Text is not None else '\n\n ') + 81 ( 82 Text("Connector", style=styles['connector']) if Text is not None else 83 colored("Connector", style=styles['connector']) 84 ) + (Text('\n +-- ') if Text is not None else '\n +-- ') + 85 ( 86 Text("Metric", style=styles['metric']) if Text is not None else 87 colored("Metric", style=styles['metric']) 88 ) + (Text('\n +-- ') if Text is not None else '\n +-- ') + 89 ( 90 Text("Location", style=styles['location']) if Text is not None else 91 colored("Location", style=styles['location']) 92 ) + (Text('\n\n') if Text is not None else '\n\n') 93 ) 94 95 output += str(key_str) 96 cols.append(key_str) 97 98 def replace_tree_text(tree_str : str) -> Text: 99 """Replace the colored words with stylized Text instead. 100 Is not executed if ANSI and UNICODE are disabled.""" 101 tree_text = Text(tree_str) if Text is not None else None 102 for k, v in replace_dict.items(): 103 for _colored, _text in v.items(): 104 parts = [] 105 lines = tree_text.split(_colored) 106 for part in lines: 107 parts += [part, _text] 108 if lines[-1] != Text(''): 109 parts = parts[:-1] 110 _tree_text = Text('') 111 for part in parts: 112 _tree_text += part 113 tree_text = _tree_text 114 return tree_text 115 116 tree_output = "" 117 for k, v in ascii_dict.items(): 118 branch = {k : v} 119 tree_output += tree(branch) + '\n\n' 120 if not UNICODE and not ANSI: 121 _col = (Text(tree(branch)) if Text is not None else tree(branch)) 122 else: 123 _col = replace_tree_text(tree(branch)) 124 cols.append(_col) 125 if len(output) > 0: 126 tree_output = tree_output[:-2] 127 output += tree_output 128 129 if rich is None: 130 return print(output) 131 132 rich_columns = attempt_import('rich.columns') 133 Columns = rich_columns.Columns 134 columns = Columns(cols) 135 get_console().print(columns) 136 137 if not UNICODE: 138 return ascii_print_pipes() 139 140 rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import( 141 'rich.panel', 142 'rich.tree', 143 'rich.text', 144 'rich.columns', 145 'rich.table', 146 ) 147 from rich import box 148 Panel = rich_panel.Panel 149 Tree = rich_tree.Tree 150 Text = rich_text.Text 151 Columns = rich_columns.Columns 152 Table = rich_table.Table 153 154 key_panel = Panel( 155 ( 156 Text("\n") + 157 Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") + 158 Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") + 159 Text(icons['location'] + "Location", style=styles['location']) + Text("\n") 160 ), 161 title = Text(icons['key'] + "Keys", style=styles['guide']), 162 border_style = styles['guide'], 163 expand = True 164 ) 165 166 cols = [] 167 conn_trees = {} 168 metric_trees = {} 169 pipes = sorted_dict(pipes) 170 for conn_keys, metrics in pipes.items(): 171 conn_trees[conn_keys] = Tree( 172 Text( 173 icons['connector'] + conn_keys, 174 style = styles['connector'], 175 ), 176 guide_style = styles['connector'] 177 ) 178 metric_trees[conn_keys] = {} 179 for metric, locations in metrics.items(): 180 metric_trees[conn_keys][metric] = Tree( 181 Text( 182 icons['metric'] + metric, 183 style = styles['metric'] 184 ), 185 guide_style = styles['metric'] 186 ) 187 conn_trees[conn_keys].add(metric_trees[conn_keys][metric]) 188 for location, pipe in locations.items(): 189 _location = ( 190 Text(str(location), style=styles['none']) if location is None 191 else Text(location, style=styles['location']) 192 ) 193 _location = ( 194 Text(icons['location']) 195 + _location + Text('\n') 196 + pipe_repr(pipe, as_rich_text=True) + Text('\n') 197 ) 198 metric_trees[conn_keys][metric].add(_location) 199 200 cols += [key_panel] 201 for k, t in conn_trees.items(): 202 cols.append(t) 203 204 columns = Columns(cols) 205 get_console().print(columns)
Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.
330def print_options( 331 options: Optional[Iterable[Any]] = None, 332 nopretty: bool = False, 333 no_rich: bool = False, 334 name: str = 'options', 335 header: Optional[str] = None, 336 num_cols: Optional[int] = None, 337 adjust_cols: bool = True, 338 sort_options: bool = False, 339 number_options: bool = False, 340 **kw 341) -> None: 342 """ 343 Print items in an iterable as a fancy table. 344 345 Parameters 346 ---------- 347 options: Optional[Dict[str, Any]], default None 348 The iterable to be printed. 349 350 nopretty: bool, default False 351 If `True`, don't use fancy formatting. 352 353 no_rich: bool, default False 354 If `True`, don't use `rich` to format the output. 355 356 name: str, default 'options' 357 The text in the default header after `'Available'`. 358 359 header: Optional[str], default None 360 If provided, override `name` and use this as the header text. 361 362 num_cols: Optional[int], default None 363 How many columns in the table. Depends on the terminal size. If `None`, use 8. 364 365 adjust_cols: bool, default True 366 If `True`, adjust the number of columns depending on the terminal size. 367 368 sort_options: bool, default False 369 If `True`, print the options in sorted order. 370 371 number_options: bool, default False 372 If `True`, print the option's number in the list (1 index). 373 374 """ 375 from meerschaum.utils.packages import import_rich 376 from meerschaum.utils.formatting import highlight_pipes 377 from meerschaum.utils.misc import get_cols_lines, string_width 378 379 if options is None: 380 options = {} 381 _options = [] 382 for o in options: 383 _options.append(str(o)) 384 if sort_options: 385 _options = sorted(_options) 386 _header = f"\nAvailable {name}" if header is None else header 387 388 if num_cols is None: 389 num_cols = 8 390 391 def _print_options_no_rich(): 392 if not nopretty: 393 print() 394 print(make_header(_header)) 395 ### print actions 396 for i, option in enumerate(_options): 397 marker = '-' if not number_options else (str(i + 1) + '.') 398 if not nopretty: 399 print(f" {marker} ", end="") 400 print(option) 401 if not nopretty: 402 print() 403 404 rich = import_rich() 405 if rich is None or nopretty or no_rich: 406 _print_options_no_rich() 407 return None 408 409 ### Prevent too many options from being truncated on small terminals. 410 if adjust_cols and _options: 411 _cols, _lines = get_cols_lines() 412 while num_cols > 1: 413 cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols) 414 num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options]) 415 if num_too_big > int(len(_options) / 3): 416 num_cols -= 1 417 continue 418 break 419 420 from meerschaum.utils.packages import attempt_import 421 rich_table = attempt_import('rich.table') 422 Text = attempt_import('rich.text').Text 423 box = attempt_import('rich.box') 424 Table = rich_table.Table 425 426 if _header is not None: 427 table = Table( 428 title=_header, 429 box=box.SIMPLE, 430 show_header=False, 431 show_footer=False, 432 title_style='', 433 expand = True, 434 ) 435 else: 436 table = Table.grid(padding=(0, 2)) 437 for i in range(num_cols): 438 table.add_column() 439 440 if len(_options) < 12: 441 ### If fewer than 12 items, use a single column 442 for i, option in enumerate(_options): 443 item = highlight_pipes(option) 444 if number_options: 445 item = str(i + 1) + '. ' + item 446 table.add_row(Text.from_ansi(item)) 447 else: 448 ### Otherwise, use multiple columns as before 449 num_rows = (len(_options) + num_cols - 1) // num_cols 450 item_ix = 0 451 for i in range(num_rows): 452 row = [] 453 for j in range(num_cols): 454 index = i + j * num_rows 455 if index < len(_options): 456 item = highlight_pipes(_options[index]) 457 if number_options: 458 item = str(i + 1) + '. ' + item 459 row.append(Text.from_ansi(item)) 460 item_ix += 1 461 else: 462 row.append('') 463 table.add_row(*row) 464 465 get_console().print(table) 466 return None
Print items in an iterable as a fancy table.
Parameters
- options (Optional[Dict[str, Any]], default None): The iterable to be printed.
- nopretty (bool, default False):
If
True
, don't use fancy formatting. - no_rich (bool, default False):
If
True
, don't userich
to format the output. - name (str, default 'options'):
The text in the default header after
'Available'
. - header (Optional[str], default None):
If provided, override
name
and use this as the header text. - num_cols (Optional[int], default None):
How many columns in the table. Depends on the terminal size. If
None
, use 8. - adjust_cols (bool, default True):
If
True
, adjust the number of columns depending on the terminal size. - sort_options (bool, default False):
If
True
, print the options in sorted order. - number_options (bool, default False):
If
True
, print the option's number in the list (1 index).
433def print_pipes_results( 434 pipes_results: Dict[mrsm.Pipe, SuccessTuple], 435 success_header: Optional[str] = 'Successes', 436 failure_header: Optional[str] = 'Failures', 437 nopretty: bool = False, 438 **kwargs: Any 439 ) -> None: 440 """ 441 Print the pipes and their result SuccessTuples. 442 443 Parameters 444 ---------- 445 pipes_results: Dict[mrsm.Pipe, SuccessTuple] 446 A dictionary mapping pipes to their resulting SuccessTuples. 447 448 success_header: Optional[str], default 'Successes' 449 The header to print above the successful pipes. 450 451 failure_header: Optional[str], default 'Fails' 452 The header to print above the failed pipes. 453 454 kwargs: Any 455 All other keyword arguments are passed to `meerschaum.utils.misc.print_options`. 456 """ 457 from meerschaum.utils.misc import print_options 458 successes = [pipe for pipe, (success, msg) in pipes_results.items() if success] 459 fails = [pipe for pipe, (success, msg) in pipes_results.items() if success] 460 success_options = [ 461 format_pipe_success_tuple(pipe, success_tuple, nopretty=nopretty) 462 for pipe, success_tuple in pipes_results.items() 463 if success_tuple[0] 464 ] 465 failure_options = [ 466 format_pipe_success_tuple(pipe, success_tuple, nopretty=nopretty) 467 for pipe, success_tuple in pipes_results.items() 468 if not success_tuple[0] 469 ] 470 471 if success_options: 472 print_options( 473 success_options, 474 header = success_header, 475 nopretty = nopretty, 476 **kwargs 477 ) 478 if failure_options: 479 print_options( 480 failure_options, 481 header = failure_header, 482 nopretty = nopretty, 483 **kwargs 484 )
Print the pipes and their result SuccessTuples.
Parameters
- pipes_results (Dict[mrsm.Pipe, SuccessTuple]): A dictionary mapping pipes to their resulting SuccessTuples.
- success_header (Optional[str], default 'Successes'): The header to print above the successful pipes.
- failure_header (Optional[str], default 'Fails'): The header to print above the failed pipes.
- kwargs (Any):
All other keyword arguments are passed to
meerschaum.utils.misc.print_options
.
219def print_tuple( 220 tup: mrsm.SuccessTuple, 221 skip_common: bool = True, 222 common_only: bool = False, 223 upper_padding: int = 1, 224 lower_padding: int = 1, 225 left_padding: int = 1, 226 calm: bool = False, 227 _progress: Optional['rich.progress.Progress'] = None, 228) -> None: 229 """ 230 Format `meerschaum.utils.typing.SuccessTuple`. 231 232 Parameters 233 ---------- 234 skip_common: bool, default True 235 If `True`, do not print common success tuples (i.e. `(True, "Success")`). 236 237 common_only: bool, default False 238 If `True`, only print if the success tuple is common. 239 240 upper_padding: int, default 0 241 How many newlines to prepend to the message. 242 243 lower_padding: int, default 0 244 How many newlines to append to the message. 245 246 left_padding: int, default 1 247 How mant spaces to preprend to the message. 248 249 calm: bool, default False 250 If `True`, use the default emoji and color scheme. 251 252 """ 253 from meerschaum._internal.static import STATIC_CONFIG 254 do_print = True 255 256 omit_messages = STATIC_CONFIG['system']['success']['ignore'] 257 258 if common_only: 259 skip_common = False 260 do_print = tup[1].strip() in omit_messages 261 262 if skip_common: 263 do_print = tup[1].strip() not in omit_messages 264 265 if not do_print: 266 return 267 268 print(format_success_tuple( 269 tup, 270 upper_padding=upper_padding, 271 lower_padding=lower_padding, 272 calm=calm, 273 _progress=_progress, 274 ))
Format meerschaum.utils.typing.SuccessTuple
.
Parameters
- skip_common (bool, default True):
If
True
, do not print common success tuples (i.e.(True, "Success")
). - common_only (bool, default False):
If
True
, only print if the success tuple is common. - upper_padding (int, default 0): How many newlines to prepend to the message.
- lower_padding (int, default 0): How many newlines to append to the message.
- left_padding (int, default 1): How mant spaces to preprend to the message.
- calm (bool, default False):
If
True
, use the default emoji and color scheme.
60def translate_rich_to_termcolor(*colors) -> tuple: 61 """Translate between rich and more_termcolor terminology.""" 62 _colors = [] 63 for c in colors: 64 _c_list = [] 65 ### handle 'bright' 66 c = c.replace('bright_', 'bright ') 67 68 ### handle 'on' 69 if ' on ' in c: 70 _on = c.split(' on ') 71 _colors.append(_on[0]) 72 for _c in _on[1:]: 73 _c_list.append('on ' + _c) 74 else: 75 _c_list += [c] 76 77 _colors += _c_list 78 79 return tuple(_colors)
Translate between rich and more_termcolor terminology.