meerschaum.utils.formatting
Utilities for formatting output text
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utilities for formatting output text 7""" 8 9from __future__ import annotations 10import platform 11import os 12import sys 13import meerschaum as mrsm 14from meerschaum.utils.typing import Optional, Union, Any, Dict, Iterable 15from meerschaum.utils.formatting._shell import make_header 16from meerschaum.utils.formatting._pprint import pprint 17from meerschaum.utils.formatting._pipes import ( 18 pprint_pipes, 19 highlight_pipes, 20 format_pipe_success_tuple, 21 print_pipes_results, 22 extract_stats_from_message, 23 pipe_repr, 24) 25from meerschaum.utils.threading import Lock, RLock 26 27_attrs = { 28 'ANSI': None, 29 'UNICODE': None, 30 'CHARSET': None, 31 'RESET': '\033[0m', 32} 33__all__ = sorted([ 34 'ANSI', 'CHARSET', 'UNICODE', 'RESET', 35 'colored', 36 'translate_rich_to_termcolor', 37 'get_console', 38 'format_success_tuple', 39 'print_tuple', 40 'print_options', 41 'fill_ansi', 42 'pprint', 43 'highlight_pipes', 44 'pprint_pipes', 45 'make_header', 46 'pipe_repr', 47 'print_pipes_results', 48 'extract_stats_from_message', 49]) 50__pdoc__ = {} 51_locks = { 52 '_colorama_init': RLock(), 53} 54 55 56def colored_fallback(*args, **kw): 57 return ' '.join(args) 58 59def translate_rich_to_termcolor(*colors) -> tuple: 60 """Translate between rich and more_termcolor terminology.""" 61 _colors = [] 62 for c in colors: 63 _c_list = [] 64 ### handle 'bright' 65 c = c.replace('bright_', 'bright ') 66 67 ### handle 'on' 68 if ' on ' in c: 69 _on = c.split(' on ') 70 _colors.append(_on[0]) 71 for _c in _on[1:]: 72 _c_list.append('on ' + _c) 73 else: 74 _c_list += [c] 75 76 _colors += _c_list 77 78 return tuple(_colors) 79 80 81def rich_text_to_str(text: 'rich.text.Text') -> str: 82 """Convert a `rich.text.Text` object to a string with ANSI in-tact.""" 83 _console = get_console() 84 if _console is None: 85 return str(text) 86 with console.capture() as cap: 87 console.print(text) 88 string = cap.get() 89 return string[:-1] 90 91 92def _init(): 93 """ 94 Initial color settings (mostly for Windows). 95 """ 96 if platform.system() != "Windows": 97 return 98 if 'PYTHONIOENCODING' not in os.environ: 99 os.environ['PYTHONIOENCODING'] = 'utf-8' 100 if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ: 101 os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8' 102 sys.stdin.reconfigure(encoding='utf-8') 103 sys.stdout.reconfigure(encoding='utf-8') 104 sys.stderr.reconfigure(encoding='utf-8') 105 106 from ctypes import windll 107 k = windll.kernel32 108 k.SetConsoleMode(k.GetStdHandle(-11), 7) 109 os.system("color") 110 111 from meerschaum.utils.packages import attempt_import 112 ### init colorama for Windows color output 113 colorama, more_termcolor = attempt_import( 114 'colorama', 115 'more_termcolor', 116 lazy = False, 117 warn = False, 118 color = False, 119 ) 120 try: 121 colorama.init(autoreset=False) 122 success = True 123 except Exception: 124 import traceback 125 traceback.print_exc() 126 _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii' 127 success = False 128 129 if more_termcolor is None: 130 _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii' 131 success = False 132 133 return success 134 135_colorama_init = False 136def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']: 137 """Apply colors and rich styles to a string. 138 If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string. 139 Otherwise attempt to use the legacy `more_termcolor.colored` method. 140 141 Parameters 142 ---------- 143 text: str 144 The string to apply formatting to. 145 146 *colors: 147 A list of colors to pass to `more_termcolor.colored()`. 148 Has no effect if `style` is provided. 149 150 style: str, default None 151 If provided, pass to `rich` for processing. 152 153 as_rich_text: bool, default False 154 If `True`, return a `rich.Text` object. 155 `style` must be provided. 156 157 **kw: 158 Keyword arguments to pass to `rich.text.Text` or `more_termcolor`. 159 160 161 Returns 162 ------- 163 An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`. 164 165 """ 166 from meerschaum.utils.packages import import_rich, attempt_import 167 global _colorama_init 168 _init() 169 with _locks['_colorama_init']: 170 if not _colorama_init: 171 _colorama_init = _init() 172 173 if 'style' in kw: 174 rich = import_rich() 175 rich_text = attempt_import('rich.text') 176 text_obj = rich_text.Text(text, **kw) 177 if as_rich_text: 178 return text_obj 179 return rich_text_to_str(text_obj) 180 181 more_termcolor = attempt_import('more_termcolor', lazy=False) 182 try: 183 colored_text = more_termcolor.colored(text, *colors, **kw) 184 except Exception as e: 185 colored_text = None 186 187 if colored_text is not None: 188 return colored_text 189 190 try: 191 _colors = translate_rich_to_termcolor(*colors) 192 colored_text = more_termcolor.colored(text, *_colors, **kw) 193 except Exception as e: 194 colored_text = None 195 196 if colored_text is None: 197 ### NOTE: warn here? 198 return text 199 200 return colored_text 201 202console = None 203def get_console(): 204 """Get the rich console.""" 205 global console 206 from meerschaum.utils.packages import import_rich, attempt_import 207 rich = import_rich() 208 rich_console = attempt_import('rich.console') 209 try: 210 console = rich_console.Console(force_terminal=True, color_system='truecolor') 211 except Exception: 212 console = None 213 return console 214 215 216def print_tuple( 217 tup: mrsm.SuccessTuple, 218 skip_common: bool = True, 219 common_only: bool = False, 220 upper_padding: int = 1, 221 lower_padding: int = 1, 222 left_padding: int = 1, 223 calm: bool = False, 224 _progress: Optional['rich.progress.Progress'] = None, 225) -> None: 226 """ 227 Format `meerschaum.utils.typing.SuccessTuple`. 228 229 Parameters 230 ---------- 231 skip_common: bool, default True 232 If `True`, do not print common success tuples (i.e. `(True, "Success")`). 233 234 common_only: bool, default False 235 If `True`, only print if the success tuple is common. 236 237 upper_padding: int, default 0 238 How many newlines to prepend to the message. 239 240 lower_padding: int, default 0 241 How many newlines to append to the message. 242 243 left_padding: int, default 1 244 How mant spaces to preprend to the message. 245 246 calm: bool, default False 247 If `True`, use the default emoji and color scheme. 248 249 """ 250 from meerschaum.config.static import STATIC_CONFIG 251 do_print = True 252 253 omit_messages = STATIC_CONFIG['system']['success']['ignore'] 254 255 if common_only: 256 skip_common = False 257 do_print = tup[1].strip() in omit_messages 258 259 if skip_common: 260 do_print = tup[1].strip() not in omit_messages 261 262 if not do_print: 263 return 264 265 print(format_success_tuple( 266 tup, 267 upper_padding=upper_padding, 268 lower_padding=lower_padding, 269 calm=calm, 270 _progress=_progress, 271 )) 272 273 274def format_success_tuple( 275 tup: mrsm.SuccessTuple, 276 upper_padding: int = 0, 277 lower_padding: int = 0, 278 left_padding: int = 1, 279 calm: bool = False, 280 _progress: Optional['rich.progress.Progress'] = None, 281) -> str: 282 """ 283 Format `meerschaum.utils.typing.SuccessTuple`. 284 285 Parameters 286 ---------- 287 upper_padding: int, default 0 288 How many newlines to prepend to the message. 289 290 lower_padding: int, default 0 291 How many newlines to append to the message. 292 293 left_padding: int, default 1 294 How mant spaces to preprend to the message. 295 296 calm: bool, default False 297 If `True`, use the default emoji and color scheme. 298 """ 299 _init() 300 try: 301 status = 'success' if tup[0] else 'failure' 302 except TypeError: 303 status = 'failure' 304 tup = None, None 305 306 if calm: 307 status += '_calm' 308 309 ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET') 310 from meerschaum.config import get_config 311 status_config = get_config('formatting', status, patch=True) 312 313 msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1])) 314 lines = msg.split('\n') 315 lines = [lines[0]] + [ 316 ((' ' + line if not line.startswith(' ') else line)) 317 for line in lines[1:] 318 ] 319 if ANSI: 320 lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich']) 321 322 msg = '\n'.join(lines) 323 msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding) 324 return msg 325 326 327def print_options( 328 options: Optional[Iterable[Any]] = None, 329 nopretty: bool = False, 330 no_rich: bool = False, 331 name: str = 'options', 332 header: Optional[str] = None, 333 num_cols: Optional[int] = None, 334 adjust_cols: bool = True, 335 sort_options: bool = False, 336 number_options: bool = False, 337 **kw 338) -> None: 339 """ 340 Print items in an iterable as a fancy table. 341 342 Parameters 343 ---------- 344 options: Optional[Dict[str, Any]], default None 345 The iterable to be printed. 346 347 nopretty: bool, default False 348 If `True`, don't use fancy formatting. 349 350 no_rich: bool, default False 351 If `True`, don't use `rich` to format the output. 352 353 name: str, default 'options' 354 The text in the default header after `'Available'`. 355 356 header: Optional[str], default None 357 If provided, override `name` and use this as the header text. 358 359 num_cols: Optional[int], default None 360 How many columns in the table. Depends on the terminal size. If `None`, use 8. 361 362 adjust_cols: bool, default True 363 If `True`, adjust the number of columns depending on the terminal size. 364 365 sort_options: bool, default False 366 If `True`, print the options in sorted order. 367 368 number_options: bool, default False 369 If `True`, print the option's number in the list (1 index). 370 371 """ 372 from meerschaum.utils.packages import import_rich 373 from meerschaum.utils.formatting import highlight_pipes 374 from meerschaum.utils.misc import get_cols_lines, string_width 375 376 if options is None: 377 options = {} 378 _options = [] 379 for o in options: 380 _options.append(str(o)) 381 if sort_options: 382 _options = sorted(_options) 383 _header = f"\nAvailable {name}" if header is None else header 384 385 if num_cols is None: 386 num_cols = 8 387 388 def _print_options_no_rich(): 389 if not nopretty: 390 print() 391 print(make_header(_header)) 392 ### print actions 393 for i, option in enumerate(_options): 394 marker = '-' if not number_options else (str(i + 1) + '.') 395 if not nopretty: 396 print(f" {marker} ", end="") 397 print(option) 398 if not nopretty: 399 print() 400 401 rich = import_rich() 402 if rich is None or nopretty or no_rich: 403 _print_options_no_rich() 404 return None 405 406 ### Prevent too many options from being truncated on small terminals. 407 if adjust_cols and _options: 408 _cols, _lines = get_cols_lines() 409 while num_cols > 1: 410 cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols) 411 num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options]) 412 if num_too_big > int(len(_options) / 3): 413 num_cols -= 1 414 continue 415 break 416 417 from meerschaum.utils.packages import attempt_import 418 rich_table = attempt_import('rich.table') 419 Text = attempt_import('rich.text').Text 420 box = attempt_import('rich.box') 421 Table = rich_table.Table 422 423 if _header is not None: 424 table = Table( 425 title=_header, 426 box=box.SIMPLE, 427 show_header=False, 428 show_footer=False, 429 title_style='', 430 expand = True, 431 ) 432 else: 433 table = Table.grid(padding=(0, 2)) 434 for i in range(num_cols): 435 table.add_column() 436 437 if len(_options) < 12: 438 ### If fewer than 12 items, use a single column 439 for i, option in enumerate(_options): 440 item = highlight_pipes(option) 441 if number_options: 442 item = str(i + 1) + '. ' + item 443 table.add_row(Text.from_ansi(item)) 444 else: 445 ### Otherwise, use multiple columns as before 446 num_rows = (len(_options) + num_cols - 1) // num_cols 447 item_ix = 0 448 for i in range(num_rows): 449 row = [] 450 for j in range(num_cols): 451 index = i + j * num_rows 452 if index < len(_options): 453 item = highlight_pipes(_options[index]) 454 if number_options: 455 item = str(i + 1) + '. ' + item 456 row.append(Text.from_ansi(item)) 457 item_ix += 1 458 else: 459 row.append('') 460 table.add_row(*row) 461 462 get_console().print(table) 463 return None 464 465 466def fill_ansi(string: str, style: str = '') -> str: 467 """ 468 Fill in non-formatted segments of ANSI text. 469 470 Parameters 471 ---------- 472 string: str 473 A string which contains ANSI escape codes. 474 475 style: str 476 Style arguments to pass to `rich.text.Text`. 477 478 Returns 479 ------- 480 A string with ANSI styling applied to the segments which don't yet have a style applied. 481 """ 482 from meerschaum.utils.packages import import_rich, attempt_import 483 from meerschaum.utils.misc import iterate_chunks 484 _ = import_rich() 485 rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text') 486 Text = rich_text.Text 487 try: 488 msg = Text.from_ansi(string) 489 except AttributeError: 490 import traceback 491 traceback.print_stack() 492 msg = '' 493 plain_indices = [] 494 for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)): 495 left = left_span.end 496 right = right_span.start if not isinstance(right_span, int) else right_span 497 if left != right: 498 plain_indices.append((left, right)) 499 if msg.spans: 500 if msg.spans[0].start != 0: 501 plain_indices = [(0, msg.spans[0].start)] + plain_indices 502 if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg): 503 plain_indices.append((msg.spans[-1].end, len(msg))) 504 505 if plain_indices: 506 for left, right in plain_indices: 507 msg.stylize(style, left, right) 508 else: 509 msg = Text(str(msg), style) 510 511 return rich_text_to_str(msg) 512 513 514def __getattr__(name: str) -> str: 515 """ 516 Lazily load module-level variables. 517 """ 518 if name.startswith('__') and name.endswith('__'): 519 raise AttributeError("Cannot import dunders from this module.") 520 521 if name in _attrs: 522 if _attrs[name] is not None: 523 return _attrs[name] 524 from meerschaum.config import get_config 525 if name.lower() in get_config('formatting'): 526 _attrs[name] = get_config('formatting', name.lower()) 527 elif name == 'CHARSET': 528 _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii' 529 return _attrs[name] 530 531 if name == '__wrapped__': 532 import sys 533 return sys.modules[__name__] 534 if name == '__all__': 535 return __all__ 536 537 try: 538 return globals()[name] 539 except KeyError: 540 raise AttributeError(f"Could not find '{name}'")
137def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']: 138 """Apply colors and rich styles to a string. 139 If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string. 140 Otherwise attempt to use the legacy `more_termcolor.colored` method. 141 142 Parameters 143 ---------- 144 text: str 145 The string to apply formatting to. 146 147 *colors: 148 A list of colors to pass to `more_termcolor.colored()`. 149 Has no effect if `style` is provided. 150 151 style: str, default None 152 If provided, pass to `rich` for processing. 153 154 as_rich_text: bool, default False 155 If `True`, return a `rich.Text` object. 156 `style` must be provided. 157 158 **kw: 159 Keyword arguments to pass to `rich.text.Text` or `more_termcolor`. 160 161 162 Returns 163 ------- 164 An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`. 165 166 """ 167 from meerschaum.utils.packages import import_rich, attempt_import 168 global _colorama_init 169 _init() 170 with _locks['_colorama_init']: 171 if not _colorama_init: 172 _colorama_init = _init() 173 174 if 'style' in kw: 175 rich = import_rich() 176 rich_text = attempt_import('rich.text') 177 text_obj = rich_text.Text(text, **kw) 178 if as_rich_text: 179 return text_obj 180 return rich_text_to_str(text_obj) 181 182 more_termcolor = attempt_import('more_termcolor', lazy=False) 183 try: 184 colored_text = more_termcolor.colored(text, *colors, **kw) 185 except Exception as e: 186 colored_text = None 187 188 if colored_text is not None: 189 return colored_text 190 191 try: 192 _colors = translate_rich_to_termcolor(*colors) 193 colored_text = more_termcolor.colored(text, *_colors, **kw) 194 except Exception as e: 195 colored_text = None 196 197 if colored_text is None: 198 ### NOTE: warn here? 199 return text 200 201 return colored_text
Apply colors and rich styles to a string.
If a style
keyword is provided, a rich.text.Text
object will be parsed into a string.
Otherwise attempt to use the legacy more_termcolor.colored
method.
Parameters
- text (str): The string to apply formatting to.
- *colors:: A list of colors to pass to
more_termcolor.colored()
. Has no effect ifstyle
is provided. - style (str, default None):
If provided, pass to
rich
for processing. - as_rich_text (bool, default False):
If
True
, return arich.Text
object.style
must be provided. - **kw:: Keyword arguments to pass to
rich.text.Text
ormore_termcolor
.
Returns
- An ANSI-formatted string or a
rich.text.Text
object ifas_rich_text
isTrue
.
485def extract_stats_from_message( 486 message: str, 487 stat_keys: Optional[List[str]] = None, 488) -> Dict[str, int]: 489 """ 490 Given a sync message, return the insert, update, upsert stats from within. 491 492 Parameters 493 ---------- 494 message: str 495 The message to parse for statistics. 496 497 stat_keys: Optional[List[str]], default None 498 If provided, search for these words (case insensitive) in the message. 499 Defaults to `['inserted', 'updated', 'upserted']`. 500 501 Returns 502 ------- 503 A dictionary mapping the stat keys to the total number of rows affected. 504 """ 505 stat_keys = stat_keys or ['inserted', 'updated', 'upserted', 'checked'] 506 lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')] 507 message_stats = { 508 stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats) 509 for stat_key in stat_keys 510 } 511 return message_stats
Given a sync message, return the insert, update, upsert stats from within.
Parameters
- message (str): The message to parse for statistics.
- stat_keys (Optional[List[str]], default None):
If provided, search for these words (case insensitive) in the message.
Defaults to
['inserted', 'updated', 'upserted']
.
Returns
- A dictionary mapping the stat keys to the total number of rows affected.
467def fill_ansi(string: str, style: str = '') -> str: 468 """ 469 Fill in non-formatted segments of ANSI text. 470 471 Parameters 472 ---------- 473 string: str 474 A string which contains ANSI escape codes. 475 476 style: str 477 Style arguments to pass to `rich.text.Text`. 478 479 Returns 480 ------- 481 A string with ANSI styling applied to the segments which don't yet have a style applied. 482 """ 483 from meerschaum.utils.packages import import_rich, attempt_import 484 from meerschaum.utils.misc import iterate_chunks 485 _ = import_rich() 486 rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text') 487 Text = rich_text.Text 488 try: 489 msg = Text.from_ansi(string) 490 except AttributeError: 491 import traceback 492 traceback.print_stack() 493 msg = '' 494 plain_indices = [] 495 for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)): 496 left = left_span.end 497 right = right_span.start if not isinstance(right_span, int) else right_span 498 if left != right: 499 plain_indices.append((left, right)) 500 if msg.spans: 501 if msg.spans[0].start != 0: 502 plain_indices = [(0, msg.spans[0].start)] + plain_indices 503 if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg): 504 plain_indices.append((msg.spans[-1].end, len(msg))) 505 506 if plain_indices: 507 for left, right in plain_indices: 508 msg.stylize(style, left, right) 509 else: 510 msg = Text(str(msg), style) 511 512 return rich_text_to_str(msg)
Fill in non-formatted segments of ANSI text.
Parameters
- string (str): A string which contains ANSI escape codes.
- style (str):
Style arguments to pass to
rich.text.Text
.
Returns
- A string with ANSI styling applied to the segments which don't yet have a style applied.
275def format_success_tuple( 276 tup: mrsm.SuccessTuple, 277 upper_padding: int = 0, 278 lower_padding: int = 0, 279 left_padding: int = 1, 280 calm: bool = False, 281 _progress: Optional['rich.progress.Progress'] = None, 282) -> str: 283 """ 284 Format `meerschaum.utils.typing.SuccessTuple`. 285 286 Parameters 287 ---------- 288 upper_padding: int, default 0 289 How many newlines to prepend to the message. 290 291 lower_padding: int, default 0 292 How many newlines to append to the message. 293 294 left_padding: int, default 1 295 How mant spaces to preprend to the message. 296 297 calm: bool, default False 298 If `True`, use the default emoji and color scheme. 299 """ 300 _init() 301 try: 302 status = 'success' if tup[0] else 'failure' 303 except TypeError: 304 status = 'failure' 305 tup = None, None 306 307 if calm: 308 status += '_calm' 309 310 ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET') 311 from meerschaum.config import get_config 312 status_config = get_config('formatting', status, patch=True) 313 314 msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1])) 315 lines = msg.split('\n') 316 lines = [lines[0]] + [ 317 ((' ' + line if not line.startswith(' ') else line)) 318 for line in lines[1:] 319 ] 320 if ANSI: 321 lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich']) 322 323 msg = '\n'.join(lines) 324 msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding) 325 return msg
Format meerschaum.utils.typing.SuccessTuple
.
Parameters
- upper_padding (int, default 0): How many newlines to prepend to the message.
- lower_padding (int, default 0): How many newlines to append to the message.
- left_padding (int, default 1): How mant spaces to preprend to the message.
- calm (bool, default False):
If
True
, use the default emoji and color scheme.
204def get_console(): 205 """Get the rich console.""" 206 global console 207 from meerschaum.utils.packages import import_rich, attempt_import 208 rich = import_rich() 209 rich_console = attempt_import('rich.console') 210 try: 211 console = rich_console.Console(force_terminal=True, color_system='truecolor') 212 except Exception: 213 console = None 214 return console
Get the rich console.
330def highlight_pipes(message: str) -> str: 331 """ 332 Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects. 333 """ 334 if 'Pipe(' not in message: 335 return message 336 337 from meerschaum import Pipe 338 segments = message.split('Pipe(') 339 msg = '' 340 _d = {} 341 for i, segment in enumerate(segments): 342 comma_index = segment.find(',') 343 paren_index = segment.find(')') 344 single_quote_index = segment.find("'") 345 double_quote_index = segment.find('"') 346 347 has_comma = comma_index != -1 348 has_paren = paren_index != -1 349 has_single_quote = single_quote_index != -1 350 has_double_quote = double_quote_index != -1 351 has_quote = has_single_quote or has_double_quote 352 quote_index = ( 353 min(single_quote_index, double_quote_index) 354 if has_double_quote and has_single_quote 355 else (single_quote_index if has_single_quote else double_quote_index) 356 ) 357 358 has_pipe = ( 359 has_comma 360 and 361 has_paren 362 and 363 has_quote 364 and not 365 (comma_index > paren_index or quote_index > paren_index) 366 ) 367 368 if has_pipe: 369 code = "_d['pipe'] = Pipe(" + segment[:paren_index + 1] 370 try: 371 exec(code) 372 _to_add = pipe_repr(_d['pipe']) + segment[paren_index + 1:] 373 _ = _d.pop('pipe', None) 374 except Exception as e: 375 _to_add = 'Pipe(' + segment 376 msg += _to_add 377 continue 378 msg += segment 379 return msg
Add syntax highlighting to an info message containing stringified meerschaum.Pipe
objects.
15def make_header( 16 message: str, 17 ruler: str = '─', 18) -> str: 19 """Format a message string with a ruler. 20 Length of the ruler is the length of the longest word. 21 22 Example: 23 'My\nheader' -> 'My\nheader\n──────' 24 """ 25 26 from meerschaum.utils.formatting import ANSI, UNICODE, colored 27 if not UNICODE: 28 ruler = '-' 29 words = message.split('\n') 30 max_length = 0 31 for w in words: 32 length = len(w) 33 if length > max_length: 34 max_length = length 35 36 s = message + "\n" 37 for i in range(max_length): 38 s += ruler 39 return s
Format a message string with a ruler. Length of the ruler is the length of the longest word.
Example:
'My
header' -> 'My header ──────'
279def pipe_repr( 280 pipe: mrsm.Pipe, 281 as_rich_text: bool = False, 282 ansi: Optional[bool] = None, 283) -> Union[str, 'rich.text.Text']: 284 """ 285 Return a formatted string for representing a `meerschaum.Pipe`. 286 """ 287 from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, colored, rich_text_to_str 288 from meerschaum.utils.packages import import_rich, attempt_import 289 rich = import_rich() 290 Text = attempt_import('rich.text').Text 291 292 styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles') 293 if not ANSI or (ansi is False): 294 styles = {k: '' for k in styles} 295 _pipe_style_prefix, _pipe_style_suffix = ( 296 (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe'] 297 else ('', '') 298 ) 299 text_obj = ( 300 Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix) 301 + colored(("'" + pipe.connector_keys + "'"), style=styles['connector'], as_rich_text=True) 302 + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix) 303 + colored(("'" + pipe.metric_key + "'"), style=styles['metric'], as_rich_text=True) 304 + ( 305 ( 306 colored(', ', style=styles['punctuation'], as_rich_text=True) 307 + colored( 308 ("'" + pipe.location_key + "'"), 309 style=styles['location'], as_rich_text=True 310 ) 311 ) if pipe.location_key is not None 312 else colored('', style='', as_rich_text=True) 313 ) + ( 314 ( ### Add the `instance=` argument. 315 colored(', instance=', style=styles['punctuation'], as_rich_text=True) 316 + colored( 317 ("'" + pipe.instance_keys + "'"), 318 style=styles['instance'], as_rich_text=True 319 ) 320 ) if pipe.instance_keys != get_config('meerschaum', 'instance') 321 else colored('', style='', as_rich_text=True) 322 ) 323 + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix) 324 ) 325 if as_rich_text: 326 return text_obj 327 return rich_text_to_str(text_obj).replace('\n', '')
Return a formatted string for representing a meerschaum.Pipe
.
10def pprint( 11 *args, 12 detect_password: bool = True, 13 nopretty: bool = False, 14 **kw 15) -> None: 16 """Pretty print an object according to the configured ANSI and UNICODE settings. 17 If detect_password is True (default), search and replace passwords with '*' characters. 18 Does not mutate objects. 19 """ 20 import copy 21 import json 22 from meerschaum.utils.packages import attempt_import, import_rich 23 from meerschaum.utils.formatting import ANSI, get_console, print_tuple 24 from meerschaum.utils.warnings import error 25 from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords 26 from collections import OrderedDict 27 28 if ( 29 len(args) == 1 30 and 31 isinstance(args[0], tuple) 32 and 33 len(args[0]) == 2 34 and 35 isinstance(args[0][0], bool) 36 and 37 isinstance(args[0][1], str) 38 ): 39 return print_tuple(args[0], **filter_keywords(print_tuple, **kw)) 40 41 modify = True 42 rich_pprint = None 43 if ANSI and not nopretty: 44 rich = import_rich() 45 if rich is not None: 46 rich_pretty = attempt_import('rich.pretty') 47 if rich_pretty is not None: 48 def _rich_pprint(*args, **kw): 49 _console = get_console() 50 _kw = filter_keywords(_console.print, **kw) 51 _console.print(*args, **_kw) 52 rich_pprint = _rich_pprint 53 elif not nopretty: 54 pprintpp = attempt_import('pprintpp', warn=False) 55 try: 56 _pprint = pprintpp.pprint 57 except Exception : 58 import pprint as _pprint_module 59 _pprint = _pprint_module.pprint 60 61 func = ( 62 _pprint if rich_pprint is None else rich_pprint 63 ) if not nopretty else print 64 65 try: 66 args_copy = copy.deepcopy(args) 67 except Exception: 68 args_copy = args 69 modify = False 70 _args = [] 71 for a in args: 72 c = a 73 ### convert OrderedDict into dict 74 if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict): 75 c = dict_from_od(copy.deepcopy(c)) 76 _args.append(c) 77 args = _args 78 79 _args = list(args) 80 if detect_password and modify: 81 _args = [] 82 for a in args: 83 c = a 84 if isinstance(c, dict): 85 c = replace_password(copy.deepcopy(c)) 86 if nopretty: 87 try: 88 c = json.dumps(c) 89 is_json = True 90 except Exception: 91 is_json = False 92 if not is_json: 93 try: 94 c = str(c) 95 except Exception: 96 pass 97 _args.append(c) 98 99 ### filter out unsupported keywords 100 func_kw = filter_keywords(func, **kw) if not nopretty else {} 101 error_msg = None 102 try: 103 func(*_args, **func_kw) 104 except Exception as e: 105 error_msg = e 106 if error_msg is not None: 107 error(error_msg)
Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.
16def pprint_pipes(pipes: PipesDict) -> None: 17 """Print a stylized tree of a Pipes dictionary. 18 Supports ANSI and UNICODE global settings.""" 19 from meerschaum.utils.warnings import error 20 from meerschaum.utils.packages import attempt_import, import_rich 21 from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict 22 from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console 23 import copy 24 rich = import_rich('rich', warn=False) 25 Text = None 26 if rich is not None: 27 rich_text = attempt_import('rich.text', lazy=False) 28 Text = rich_text.Text 29 30 icons = get_config('formatting', 'pipes', CHARSET, 'icons') 31 styles = get_config('formatting', 'pipes', 'ansi', 'styles') 32 if not ANSI: 33 styles = {k: '' for k in styles} 34 print() 35 36 def ascii_print_pipes(): 37 """Print the dictionary with no unicode allowed. Also works in case rich fails to import 38 (though rich should auto-install when `attempt_import()` is called).""" 39 asciitree = attempt_import('asciitree') 40 ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}} 41 for conn_keys, metrics in pipes.items(): 42 _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector']) 43 if Text is not None: 44 replace_dict['connector'][_colored_conn_key] = ( 45 Text(conn_keys, style=styles['connector']) 46 ) 47 ascii_dict[_colored_conn_key] = {} 48 for metric, locations in metrics.items(): 49 _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric']) 50 if Text is not None: 51 replace_dict['metric'][_colored_metric_key] = ( 52 Text(metric, style=styles['metric']) 53 ) 54 ascii_dict[_colored_conn_key][_colored_metric_key] = {} 55 for location, pipe in locations.items(): 56 _location_style = styles[('none' if location is None else 'location')] 57 pipe_addendum = '\n ' + pipe.__repr__() + '\n' 58 _colored_location = colored( 59 icons['location'] + str(location), style=_location_style 60 ) 61 _colored_location_key = _colored_location + pipe_addendum 62 if Text is not None: 63 replace_dict['location'][_colored_location] = ( 64 Text(str(location), style=_location_style) 65 ) 66 ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {} 67 68 tree = asciitree.LeftAligned() 69 output = '' 70 cols = [] 71 72 ### This is pretty terrible, unreadable code. 73 ### Please know that I'm normally better than this. 74 key_str = ( 75 (Text(" ") if Text is not None else " ") + 76 ( 77 Text("Key", style='underline') if Text is not None else 78 colored("Key", style='underline') 79 ) + (Text('\n\n ') if Text is not None else '\n\n ') + 80 ( 81 Text("Connector", style=styles['connector']) if Text is not None else 82 colored("Connector", style=styles['connector']) 83 ) + (Text('\n +-- ') if Text is not None else '\n +-- ') + 84 ( 85 Text("Metric", style=styles['metric']) if Text is not None else 86 colored("Metric", style=styles['metric']) 87 ) + (Text('\n +-- ') if Text is not None else '\n +-- ') + 88 ( 89 Text("Location", style=styles['location']) if Text is not None else 90 colored("Location", style=styles['location']) 91 ) + (Text('\n\n') if Text is not None else '\n\n') 92 ) 93 94 output += str(key_str) 95 cols.append(key_str) 96 97 def replace_tree_text(tree_str : str) -> Text: 98 """Replace the colored words with stylized Text instead. 99 Is not executed if ANSI and UNICODE are disabled.""" 100 tree_text = Text(tree_str) if Text is not None else None 101 for k, v in replace_dict.items(): 102 for _colored, _text in v.items(): 103 parts = [] 104 lines = tree_text.split(_colored) 105 for part in lines: 106 parts += [part, _text] 107 if lines[-1] != Text(''): 108 parts = parts[:-1] 109 _tree_text = Text('') 110 for part in parts: 111 _tree_text += part 112 tree_text = _tree_text 113 return tree_text 114 115 tree_output = "" 116 for k, v in ascii_dict.items(): 117 branch = {k : v} 118 tree_output += tree(branch) + '\n\n' 119 if not UNICODE and not ANSI: 120 _col = (Text(tree(branch)) if Text is not None else tree(branch)) 121 else: 122 _col = replace_tree_text(tree(branch)) 123 cols.append(_col) 124 if len(output) > 0: 125 tree_output = tree_output[:-2] 126 output += tree_output 127 128 if rich is None: 129 return print(output) 130 131 rich_columns = attempt_import('rich.columns') 132 Columns = rich_columns.Columns 133 columns = Columns(cols) 134 get_console().print(columns) 135 136 if not UNICODE: 137 return ascii_print_pipes() 138 139 rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import( 140 'rich.panel', 141 'rich.tree', 142 'rich.text', 143 'rich.columns', 144 'rich.table', 145 ) 146 from rich import box 147 Panel = rich_panel.Panel 148 Tree = rich_tree.Tree 149 Text = rich_text.Text 150 Columns = rich_columns.Columns 151 Table = rich_table.Table 152 153 key_panel = Panel( 154 ( 155 Text("\n") + 156 Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") + 157 Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") + 158 Text(icons['location'] + "Location", style=styles['location']) + Text("\n") 159 ), 160 title = Text(icons['key'] + "Keys", style=styles['guide']), 161 border_style = styles['guide'], 162 expand = True 163 ) 164 165 cols = [] 166 conn_trees = {} 167 metric_trees = {} 168 pipes = sorted_dict(pipes) 169 for conn_keys, metrics in pipes.items(): 170 conn_trees[conn_keys] = Tree( 171 Text( 172 icons['connector'] + conn_keys, 173 style = styles['connector'], 174 ), 175 guide_style = styles['connector'] 176 ) 177 metric_trees[conn_keys] = {} 178 for metric, locations in metrics.items(): 179 metric_trees[conn_keys][metric] = Tree( 180 Text( 181 icons['metric'] + metric, 182 style = styles['metric'] 183 ), 184 guide_style = styles['metric'] 185 ) 186 conn_trees[conn_keys].add(metric_trees[conn_keys][metric]) 187 for location, pipe in locations.items(): 188 _location = ( 189 Text(str(location), style=styles['none']) if location is None 190 else Text(location, style=styles['location']) 191 ) 192 _location = ( 193 Text(icons['location']) 194 + _location + Text('\n') 195 + pipe_repr(pipe, as_rich_text=True) + Text('\n') 196 ) 197 metric_trees[conn_keys][metric].add(_location) 198 199 cols += [key_panel] 200 for k, t in conn_trees.items(): 201 cols.append(t) 202 203 columns = Columns(cols) 204 get_console().print(columns)
Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.
328def print_options( 329 options: Optional[Iterable[Any]] = None, 330 nopretty: bool = False, 331 no_rich: bool = False, 332 name: str = 'options', 333 header: Optional[str] = None, 334 num_cols: Optional[int] = None, 335 adjust_cols: bool = True, 336 sort_options: bool = False, 337 number_options: bool = False, 338 **kw 339) -> None: 340 """ 341 Print items in an iterable as a fancy table. 342 343 Parameters 344 ---------- 345 options: Optional[Dict[str, Any]], default None 346 The iterable to be printed. 347 348 nopretty: bool, default False 349 If `True`, don't use fancy formatting. 350 351 no_rich: bool, default False 352 If `True`, don't use `rich` to format the output. 353 354 name: str, default 'options' 355 The text in the default header after `'Available'`. 356 357 header: Optional[str], default None 358 If provided, override `name` and use this as the header text. 359 360 num_cols: Optional[int], default None 361 How many columns in the table. Depends on the terminal size. If `None`, use 8. 362 363 adjust_cols: bool, default True 364 If `True`, adjust the number of columns depending on the terminal size. 365 366 sort_options: bool, default False 367 If `True`, print the options in sorted order. 368 369 number_options: bool, default False 370 If `True`, print the option's number in the list (1 index). 371 372 """ 373 from meerschaum.utils.packages import import_rich 374 from meerschaum.utils.formatting import highlight_pipes 375 from meerschaum.utils.misc import get_cols_lines, string_width 376 377 if options is None: 378 options = {} 379 _options = [] 380 for o in options: 381 _options.append(str(o)) 382 if sort_options: 383 _options = sorted(_options) 384 _header = f"\nAvailable {name}" if header is None else header 385 386 if num_cols is None: 387 num_cols = 8 388 389 def _print_options_no_rich(): 390 if not nopretty: 391 print() 392 print(make_header(_header)) 393 ### print actions 394 for i, option in enumerate(_options): 395 marker = '-' if not number_options else (str(i + 1) + '.') 396 if not nopretty: 397 print(f" {marker} ", end="") 398 print(option) 399 if not nopretty: 400 print() 401 402 rich = import_rich() 403 if rich is None or nopretty or no_rich: 404 _print_options_no_rich() 405 return None 406 407 ### Prevent too many options from being truncated on small terminals. 408 if adjust_cols and _options: 409 _cols, _lines = get_cols_lines() 410 while num_cols > 1: 411 cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols) 412 num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options]) 413 if num_too_big > int(len(_options) / 3): 414 num_cols -= 1 415 continue 416 break 417 418 from meerschaum.utils.packages import attempt_import 419 rich_table = attempt_import('rich.table') 420 Text = attempt_import('rich.text').Text 421 box = attempt_import('rich.box') 422 Table = rich_table.Table 423 424 if _header is not None: 425 table = Table( 426 title=_header, 427 box=box.SIMPLE, 428 show_header=False, 429 show_footer=False, 430 title_style='', 431 expand = True, 432 ) 433 else: 434 table = Table.grid(padding=(0, 2)) 435 for i in range(num_cols): 436 table.add_column() 437 438 if len(_options) < 12: 439 ### If fewer than 12 items, use a single column 440 for i, option in enumerate(_options): 441 item = highlight_pipes(option) 442 if number_options: 443 item = str(i + 1) + '. ' + item 444 table.add_row(Text.from_ansi(item)) 445 else: 446 ### Otherwise, use multiple columns as before 447 num_rows = (len(_options) + num_cols - 1) // num_cols 448 item_ix = 0 449 for i in range(num_rows): 450 row = [] 451 for j in range(num_cols): 452 index = i + j * num_rows 453 if index < len(_options): 454 item = highlight_pipes(_options[index]) 455 if number_options: 456 item = str(i + 1) + '. ' + item 457 row.append(Text.from_ansi(item)) 458 item_ix += 1 459 else: 460 row.append('') 461 table.add_row(*row) 462 463 get_console().print(table) 464 return None
Print items in an iterable as a fancy table.
Parameters
- options (Optional[Dict[str, Any]], default None): The iterable to be printed.
- nopretty (bool, default False):
If
True
, don't use fancy formatting. - no_rich (bool, default False):
If
True
, don't userich
to format the output. - name (str, default 'options'):
The text in the default header after
'Available'
. - header (Optional[str], default None):
If provided, override
name
and use this as the header text. - num_cols (Optional[int], default None):
How many columns in the table. Depends on the terminal size. If
None
, use 8. - adjust_cols (bool, default True):
If
True
, adjust the number of columns depending on the terminal size. - sort_options (bool, default False):
If
True
, print the options in sorted order. - number_options (bool, default False):
If
True
, print the option's number in the list (1 index).
431def print_pipes_results( 432 pipes_results: Dict[mrsm.Pipe, SuccessTuple], 433 success_header: Optional[str] = 'Successes', 434 failure_header: Optional[str] = 'Failures', 435 nopretty: bool = False, 436 **kwargs: Any 437 ) -> None: 438 """ 439 Print the pipes and their result SuccessTuples. 440 441 Parameters 442 ---------- 443 pipes_results: Dict[mrsm.Pipe, SuccessTuple] 444 A dictionary mapping pipes to their resulting SuccessTuples. 445 446 success_header: Optional[str], default 'Successes' 447 The header to print above the successful pipes. 448 449 failure_header: Optional[str], default 'Fails' 450 The header to print above the failed pipes. 451 452 kwargs: Any 453 All other keyword arguments are passed to `meerschaum.utils.misc.print_options`. 454 """ 455 from meerschaum.utils.misc import print_options 456 successes = [pipe for pipe, (success, msg) in pipes_results.items() if success] 457 fails = [pipe for pipe, (success, msg) in pipes_results.items() if success] 458 success_options = [ 459 format_pipe_success_tuple(pipe, success_tuple, nopretty=nopretty) 460 for pipe, success_tuple in pipes_results.items() 461 if success_tuple[0] 462 ] 463 failure_options = [ 464 format_pipe_success_tuple(pipe, success_tuple, nopretty=nopretty) 465 for pipe, success_tuple in pipes_results.items() 466 if not success_tuple[0] 467 ] 468 469 if success_options: 470 print_options( 471 success_options, 472 header = success_header, 473 nopretty = nopretty, 474 **kwargs 475 ) 476 if failure_options: 477 print_options( 478 failure_options, 479 header = failure_header, 480 nopretty = nopretty, 481 **kwargs 482 )
Print the pipes and their result SuccessTuples.
Parameters
- pipes_results (Dict[mrsm.Pipe, SuccessTuple]): A dictionary mapping pipes to their resulting SuccessTuples.
- success_header (Optional[str], default 'Successes'): The header to print above the successful pipes.
- failure_header (Optional[str], default 'Fails'): The header to print above the failed pipes.
- kwargs (Any):
All other keyword arguments are passed to
meerschaum.utils.misc.print_options
.
217def print_tuple( 218 tup: mrsm.SuccessTuple, 219 skip_common: bool = True, 220 common_only: bool = False, 221 upper_padding: int = 1, 222 lower_padding: int = 1, 223 left_padding: int = 1, 224 calm: bool = False, 225 _progress: Optional['rich.progress.Progress'] = None, 226) -> None: 227 """ 228 Format `meerschaum.utils.typing.SuccessTuple`. 229 230 Parameters 231 ---------- 232 skip_common: bool, default True 233 If `True`, do not print common success tuples (i.e. `(True, "Success")`). 234 235 common_only: bool, default False 236 If `True`, only print if the success tuple is common. 237 238 upper_padding: int, default 0 239 How many newlines to prepend to the message. 240 241 lower_padding: int, default 0 242 How many newlines to append to the message. 243 244 left_padding: int, default 1 245 How mant spaces to preprend to the message. 246 247 calm: bool, default False 248 If `True`, use the default emoji and color scheme. 249 250 """ 251 from meerschaum.config.static import STATIC_CONFIG 252 do_print = True 253 254 omit_messages = STATIC_CONFIG['system']['success']['ignore'] 255 256 if common_only: 257 skip_common = False 258 do_print = tup[1].strip() in omit_messages 259 260 if skip_common: 261 do_print = tup[1].strip() not in omit_messages 262 263 if not do_print: 264 return 265 266 print(format_success_tuple( 267 tup, 268 upper_padding=upper_padding, 269 lower_padding=lower_padding, 270 calm=calm, 271 _progress=_progress, 272 ))
Format meerschaum.utils.typing.SuccessTuple
.
Parameters
- skip_common (bool, default True):
If
True
, do not print common success tuples (i.e.(True, "Success")
). - common_only (bool, default False):
If
True
, only print if the success tuple is common. - upper_padding (int, default 0): How many newlines to prepend to the message.
- lower_padding (int, default 0): How many newlines to append to the message.
- left_padding (int, default 1): How mant spaces to preprend to the message.
- calm (bool, default False):
If
True
, use the default emoji and color scheme.
60def translate_rich_to_termcolor(*colors) -> tuple: 61 """Translate between rich and more_termcolor terminology.""" 62 _colors = [] 63 for c in colors: 64 _c_list = [] 65 ### handle 'bright' 66 c = c.replace('bright_', 'bright ') 67 68 ### handle 'on' 69 if ' on ' in c: 70 _on = c.split(' on ') 71 _colors.append(_on[0]) 72 for _c in _on[1:]: 73 _c_list.append('on ' + _c) 74 else: 75 _c_list += [c] 76 77 _colors += _c_list 78 79 return tuple(_colors)
Translate between rich and more_termcolor terminology.