meerschaum.utils.formatting
Utilities for formatting output text
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utilities for formatting output text 7""" 8 9from __future__ import annotations 10import platform 11import os 12import sys 13import meerschaum as mrsm 14from meerschaum.utils.typing import Optional, Union, Any, Dict 15from meerschaum.utils.formatting._shell import make_header 16from meerschaum.utils.formatting._pprint import pprint 17from meerschaum.utils.formatting._pipes import ( 18 pprint_pipes, 19 highlight_pipes, 20 format_pipe_success_tuple, 21 print_pipes_results, 22 extract_stats_from_message, 23 pipe_repr, 24) 25from meerschaum.utils.threading import Lock, RLock 26 27_attrs = { 28 'ANSI': None, 29 'UNICODE': None, 30 'CHARSET': None, 31} 32__all__ = sorted([ 33 'ANSI', 'CHARSET', 'UNICODE', 34 'colored', 35 'translate_rich_to_termcolor', 36 'get_console', 37 'format_success_tuple', 38 'print_tuple', 39 'print_options', 40 'fill_ansi', 41 'pprint', 42 'highlight_pipes', 43 'pprint_pipes', 44 'make_header', 45 'pipe_repr', 46 'print_pipes_results', 47 'extract_stats_from_message', 48]) 49__pdoc__ = {} 50_locks = { 51 '_colorama_init': RLock(), 52} 53 54 55def colored_fallback(*args, **kw): 56 return ' '.join(args) 57 58def translate_rich_to_termcolor(*colors) -> tuple: 59 """Translate between rich and more_termcolor terminology. 60 This is probably prone to breaking. 61 62 Parameters 63 ---------- 64 *colors : 65 66 67 Returns 68 ------- 69 70 """ 71 _colors = [] 72 for c in colors: 73 _c_list = [] 74 ### handle 'bright' 75 c = c.replace('bright_', 'bright ') 76 77 ### handle 'on' 78 if ' on ' in c: 79 _on = c.split(' on ') 80 _colors.append(_on[0]) 81 for _c in _on[1:]: 82 _c_list.append('on ' + _c) 83 else: 84 _c_list += [c] 85 86 _colors += _c_list 87 88 return tuple(_colors) 89 90 91def rich_text_to_str(text: 'rich.text.Text') -> str: 92 """Convert a `rich.text.Text` object to a string with ANSI in-tact.""" 93 _console = get_console() 94 if _console is None: 95 return str(text) 96 with console.capture() as cap: 97 console.print(text) 98 string = cap.get() 99 return string[:-1] 100 101 102def _init(): 103 """ 104 Initial color settings (mostly for Windows). 105 """ 106 if platform.system() != "Windows": 107 return 108 if 'PYTHONIOENCODING' not in os.environ: 109 os.environ['PYTHONIOENCODING'] = 'utf-8' 110 if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ: 111 os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8' 112 sys.stdin.reconfigure(encoding='utf-8') 113 sys.stdout.reconfigure(encoding='utf-8') 114 sys.stderr.reconfigure(encoding='utf-8') 115 116 from ctypes import windll 117 k = windll.kernel32 118 k.SetConsoleMode(k.GetStdHandle(-11), 7) 119 os.system("color") 120 121 from meerschaum.utils.packages import attempt_import 122 ### init colorama for Windows color output 123 colorama, more_termcolor = attempt_import( 124 'colorama', 125 'more_termcolor', 126 lazy = False, 127 warn = False, 128 color = False, 129 ) 130 try: 131 colorama.init(autoreset=False) 132 success = True 133 except Exception as e: 134 import traceback 135 traceback.print_exc() 136 _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii' 137 success = False 138 139 if more_termcolor is None: 140 _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii' 141 success = False 142 143 return success 144 145_colorama_init = False 146def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']: 147 """Apply colors and rich styles to a string. 148 If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string. 149 Otherwise attempt to use the legacy `more_termcolor.colored` method. 150 151 Parameters 152 ---------- 153 text: str 154 The string to apply formatting to. 155 156 *colors: 157 A list of colors to pass to `more_termcolor.colored()`. 158 Has no effect if `style` is provided. 159 160 style: str, default None 161 If provided, pass to `rich` for processing. 162 163 as_rich_text: bool, default False 164 If `True`, return a `rich.Text` object. 165 `style` must be provided. 166 167 **kw: 168 Keyword arguments to pass to `rich.text.Text` or `more_termcolor`. 169 170 171 Returns 172 ------- 173 An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`. 174 175 """ 176 from meerschaum.utils.packages import import_rich, attempt_import 177 global _colorama_init 178 _init() 179 with _locks['_colorama_init']: 180 if not _colorama_init: 181 _colorama_init = _init() 182 183 if 'style' in kw: 184 rich = import_rich() 185 rich_text = attempt_import('rich.text') 186 text_obj = rich_text.Text(text, **kw) 187 if as_rich_text: 188 return text_obj 189 return rich_text_to_str(text_obj) 190 191 more_termcolor = attempt_import('more_termcolor', lazy=False) 192 try: 193 colored_text = more_termcolor.colored(text, *colors, **kw) 194 except Exception as e: 195 colored_text = None 196 197 if colored_text is not None: 198 return colored_text 199 200 try: 201 _colors = translate_rich_to_termcolor(*colors) 202 colored_text = more_termcolor.colored(text, *_colors, **kw) 203 except Exception as e: 204 colored_text = None 205 206 if colored_text is None: 207 ### NOTE: warn here? 208 return text 209 210 return colored_text 211 212console = None 213def get_console(): 214 """Get the rich console.""" 215 global console 216 from meerschaum.utils.packages import import_rich, attempt_import 217 rich = import_rich() 218 rich_console = attempt_import('rich.console') 219 try: 220 console = rich_console.Console(force_terminal=True, color_system='truecolor') 221 except Exception as e: 222 console = None 223 return console 224 225 226def print_tuple( 227 tup: mrsm.SuccessTuple, 228 skip_common: bool = True, 229 common_only: bool = False, 230 upper_padding: int = 0, 231 lower_padding: int = 0, 232 left_padding: int = 1, 233 calm: bool = False, 234 _progress: Optional['rich.progress.Progress'] = None, 235) -> None: 236 """ 237 Format `meerschaum.utils.typing.SuccessTuple`. 238 239 Parameters 240 ---------- 241 skip_common: bool, default True 242 If `True`, do not print common success tuples (i.e. `(True, "Success")`). 243 244 common_only: bool, default False 245 If `True`, only print if the success tuple is common. 246 247 upper_padding: int, default 0 248 How many newlines to prepend to the message. 249 250 lower_padding: int, default 0 251 How many newlines to append to the message. 252 253 left_padding: int, default 1 254 How mant spaces to preprend to the message. 255 256 calm: bool, default False 257 If `True`, use the default emoji and color scheme. 258 259 """ 260 from meerschaum.config.static import STATIC_CONFIG 261 do_print = True 262 263 omit_messages = STATIC_CONFIG['system']['success']['ignore'] 264 265 if common_only: 266 skip_common = False 267 do_print = tup[1] in omit_messages 268 269 if skip_common: 270 do_print = tup[1] not in omit_messages 271 272 if not do_print: 273 return 274 275 print(format_success_tuple( 276 tup, 277 upper_padding=upper_padding, 278 lower_padding=lower_padding, 279 calm=calm, 280 _progress=_progress, 281 )) 282 283 284def format_success_tuple( 285 tup: mrsm.SuccessTuple, 286 upper_padding: int = 0, 287 lower_padding: int = 0, 288 left_padding: int = 1, 289 calm: bool = False, 290 _progress: Optional['rich.progress.Progress'] = None, 291) -> str: 292 """ 293 Format `meerschaum.utils.typing.SuccessTuple`. 294 295 Parameters 296 ---------- 297 upper_padding: int, default 0 298 How many newlines to prepend to the message. 299 300 lower_padding: int, default 0 301 How many newlines to append to the message. 302 303 left_padding: int, default 1 304 How mant spaces to preprend to the message. 305 306 calm: bool, default False 307 If `True`, use the default emoji and color scheme. 308 """ 309 from meerschaum.config.static import STATIC_CONFIG 310 _init() 311 try: 312 status = 'success' if tup[0] else 'failure' 313 except TypeError: 314 status = 'failure' 315 tup = None, None 316 317 if calm: 318 status += '_calm' 319 320 ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET') 321 from meerschaum.config import get_config 322 status_config = get_config('formatting', status, patch=True) 323 324 msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(tup[1]) 325 lines = msg.split('\n') 326 lines = [lines[0]] + [ 327 ((' ' + line if not line.startswith(' ') else line)) 328 for line in lines[1:] 329 ] 330 if ANSI: 331 lines[0] = fill_ansi(highlight_pipes(lines[0]), **status_config['ansi']['rich']) 332 333 msg = '\n'.join(lines) 334 msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding) 335 return msg 336 337 338def print_options( 339 options: Optional[Dict[str, Any]] = None, 340 nopretty: bool = False, 341 no_rich: bool = False, 342 name: str = 'options', 343 header: Optional[str] = None, 344 num_cols: Optional[int] = None, 345 adjust_cols: bool = True, 346 sort_options: bool = False, 347 **kw 348) -> None: 349 """ 350 Print items in an iterable as a fancy table. 351 352 Parameters 353 ---------- 354 options: Optional[Dict[str, Any]], default None 355 The iterable to be printed. 356 357 nopretty: bool, default False 358 If `True`, don't use fancy formatting. 359 360 no_rich: bool, default False 361 If `True`, don't use `rich` to format the output. 362 363 name: str, default 'options' 364 The text in the default header after `'Available'`. 365 366 header: Optional[str], default None 367 If provided, override `name` and use this as the header text. 368 369 num_cols: Optional[int], default None 370 How many columns in the table. Depends on the terminal size. If `None`, use 8. 371 372 adjust_cols: bool, default True 373 If `True`, adjust the number of columns depending on the terminal size. 374 375 """ 376 import os 377 from meerschaum.utils.packages import import_rich 378 from meerschaum.utils.formatting import make_header, highlight_pipes 379 from meerschaum.actions import actions as _actions 380 from meerschaum.utils.misc import get_cols_lines, string_width, iterate_chunks 381 382 383 if options is None: 384 options = {} 385 _options = [] 386 for o in options: 387 _options.append(str(o)) 388 if sort_options: 389 _options = sorted(_options) 390 _header = f"\nAvailable {name}" if header is None else header 391 392 if num_cols is None: 393 num_cols = 8 394 395 def _print_options_no_rich(): 396 if not nopretty: 397 print() 398 print(make_header(_header)) 399 ### print actions 400 for option in _options: 401 if not nopretty: 402 print(" - ", end="") 403 print(option) 404 if not nopretty: 405 print() 406 407 rich = import_rich() 408 if rich is None or nopretty or no_rich: 409 _print_options_no_rich() 410 return None 411 412 ### Prevent too many options from being truncated on small terminals. 413 if adjust_cols and _options: 414 _cols, _lines = get_cols_lines() 415 while num_cols > 1: 416 cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols) 417 num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options]) 418 if num_too_big > int(len(_options) / 3): 419 num_cols -= 1 420 continue 421 break 422 423 from meerschaum.utils.formatting import pprint, get_console 424 from meerschaum.utils.packages import attempt_import 425 rich_columns = attempt_import('rich.columns') 426 rich_panel = attempt_import('rich.panel') 427 rich_table = attempt_import('rich.table') 428 Text = attempt_import('rich.text').Text 429 box = attempt_import('rich.box') 430 Panel = rich_panel.Panel 431 Columns = rich_columns.Columns 432 Table = rich_table.Table 433 434 if _header is not None: 435 table = Table( 436 title = _header, 437 box = box.SIMPLE, 438 show_header = False, 439 show_footer = False, 440 title_style = '', 441 expand = True, 442 ) 443 else: 444 table = Table.grid(padding=(0, 2)) 445 for i in range(num_cols): 446 table.add_column() 447 448 if len(_options) < 12: 449 # If fewer than 12 items, use a single column 450 for option in _options: 451 table.add_row(Text.from_ansi(highlight_pipes(option))) 452 else: 453 # Otherwise, use multiple columns as before 454 num_rows = (len(_options) + num_cols - 1) // num_cols 455 for i in range(num_rows): 456 row = [] 457 for j in range(num_cols): 458 index = i + j * num_rows 459 if index < len(_options): 460 row.append(Text.from_ansi(highlight_pipes(_options[index]))) 461 else: 462 row.append('') 463 table.add_row(*row) 464 465 get_console().print(table) 466 return None 467 468 469def fill_ansi(string: str, style: str = '') -> str: 470 """ 471 Fill in non-formatted segments of ANSI text. 472 473 Parameters 474 ---------- 475 string: str 476 A string which contains ANSI escape codes. 477 478 style: str 479 Style arguments to pass to `rich.text.Text`. 480 481 Returns 482 ------- 483 A string with ANSI styling applied to the segments which don't yet have a style applied. 484 """ 485 from meerschaum.utils.packages import import_rich, attempt_import 486 from meerschaum.utils.misc import iterate_chunks 487 rich = import_rich() 488 Text = attempt_import('rich.text').Text 489 try: 490 msg = Text.from_ansi(string) 491 except AttributeError as e: 492 import traceback 493 traceback.print_stack() 494 msg = '' 495 plain_indices = [] 496 for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)): 497 left = left_span.end 498 right = right_span.start if not isinstance(right_span, int) else right_span 499 if left != right: 500 plain_indices.append((left, right)) 501 if msg.spans: 502 if msg.spans[0].start != 0: 503 plain_indices = [(0, msg.spans[0].start)] + plain_indices 504 if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg): 505 plain_indices.append((msg.spans[-1].end, len(msg))) 506 507 if plain_indices: 508 for left, right in plain_indices: 509 msg.stylize(style, left, right) 510 else: 511 msg = Text(str(msg), style) 512 513 return rich_text_to_str(msg) 514 515 516def __getattr__(name: str) -> str: 517 """ 518 Lazily load module-level variables. 519 """ 520 if name.startswith('__') and name.endswith('__'): 521 raise AttributeError("Cannot import dunders from this module.") 522 523 if name in _attrs: 524 if _attrs[name] is not None: 525 return _attrs[name] 526 from meerschaum.config import get_config 527 if name.lower() in get_config('formatting'): 528 _attrs[name] = get_config('formatting', name.lower()) 529 elif name == 'CHARSET': 530 _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii' 531 return _attrs[name] 532 533 if name == '__wrapped__': 534 import sys 535 return sys.modules[__name__] 536 if name == '__all__': 537 return __all__ 538 539 try: 540 return globals()[name] 541 except KeyError: 542 raise AttributeError(f"Could not find '{name}'")
147def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']: 148 """Apply colors and rich styles to a string. 149 If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string. 150 Otherwise attempt to use the legacy `more_termcolor.colored` method. 151 152 Parameters 153 ---------- 154 text: str 155 The string to apply formatting to. 156 157 *colors: 158 A list of colors to pass to `more_termcolor.colored()`. 159 Has no effect if `style` is provided. 160 161 style: str, default None 162 If provided, pass to `rich` for processing. 163 164 as_rich_text: bool, default False 165 If `True`, return a `rich.Text` object. 166 `style` must be provided. 167 168 **kw: 169 Keyword arguments to pass to `rich.text.Text` or `more_termcolor`. 170 171 172 Returns 173 ------- 174 An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`. 175 176 """ 177 from meerschaum.utils.packages import import_rich, attempt_import 178 global _colorama_init 179 _init() 180 with _locks['_colorama_init']: 181 if not _colorama_init: 182 _colorama_init = _init() 183 184 if 'style' in kw: 185 rich = import_rich() 186 rich_text = attempt_import('rich.text') 187 text_obj = rich_text.Text(text, **kw) 188 if as_rich_text: 189 return text_obj 190 return rich_text_to_str(text_obj) 191 192 more_termcolor = attempt_import('more_termcolor', lazy=False) 193 try: 194 colored_text = more_termcolor.colored(text, *colors, **kw) 195 except Exception as e: 196 colored_text = None 197 198 if colored_text is not None: 199 return colored_text 200 201 try: 202 _colors = translate_rich_to_termcolor(*colors) 203 colored_text = more_termcolor.colored(text, *_colors, **kw) 204 except Exception as e: 205 colored_text = None 206 207 if colored_text is None: 208 ### NOTE: warn here? 209 return text 210 211 return colored_text
Apply colors and rich styles to a string.
If a style
keyword is provided, a rich.text.Text
object will be parsed into a string.
Otherwise attempt to use the legacy more_termcolor.colored
method.
Parameters
- text (str): The string to apply formatting to.
- *colors:: A list of colors to pass to
more_termcolor.colored()
. Has no effect ifstyle
is provided. - style (str, default None):
If provided, pass to
rich
for processing. - as_rich_text (bool, default False):
If
True
, return arich.Text
object.style
must be provided. - **kw:: Keyword arguments to pass to
rich.text.Text
ormore_termcolor
.
Returns
- An ANSI-formatted string or a
rich.text.Text
object ifas_rich_text
isTrue
.
485def extract_stats_from_message( 486 message: str, 487 stat_keys: Optional[List[str]] = None, 488 ) -> Dict[str, int]: 489 """ 490 Given a sync message, return the insert, update, upsert stats from within. 491 492 Parameters 493 ---------- 494 message: str 495 The message to parse for statistics. 496 497 stat_keys: Optional[List[str]], default None 498 If provided, search for these words (case insensitive) in the message. 499 Defaults to `['inserted', 'updated', 'upserted']`. 500 501 Returns 502 ------- 503 A dictionary mapping the stat keys to the total number of rows affected. 504 """ 505 stat_keys = stat_keys or ['inserted', 'updated', 'upserted'] 506 lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')] 507 message_stats = { 508 stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats) 509 for stat_key in stat_keys 510 } 511 return message_stats
Given a sync message, return the insert, update, upsert stats from within.
Parameters
- message (str): The message to parse for statistics.
- stat_keys (Optional[List[str]], default None):
If provided, search for these words (case insensitive) in the message.
Defaults to
['inserted', 'updated', 'upserted']
.
Returns
- A dictionary mapping the stat keys to the total number of rows affected.
470def fill_ansi(string: str, style: str = '') -> str: 471 """ 472 Fill in non-formatted segments of ANSI text. 473 474 Parameters 475 ---------- 476 string: str 477 A string which contains ANSI escape codes. 478 479 style: str 480 Style arguments to pass to `rich.text.Text`. 481 482 Returns 483 ------- 484 A string with ANSI styling applied to the segments which don't yet have a style applied. 485 """ 486 from meerschaum.utils.packages import import_rich, attempt_import 487 from meerschaum.utils.misc import iterate_chunks 488 rich = import_rich() 489 Text = attempt_import('rich.text').Text 490 try: 491 msg = Text.from_ansi(string) 492 except AttributeError as e: 493 import traceback 494 traceback.print_stack() 495 msg = '' 496 plain_indices = [] 497 for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)): 498 left = left_span.end 499 right = right_span.start if not isinstance(right_span, int) else right_span 500 if left != right: 501 plain_indices.append((left, right)) 502 if msg.spans: 503 if msg.spans[0].start != 0: 504 plain_indices = [(0, msg.spans[0].start)] + plain_indices 505 if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg): 506 plain_indices.append((msg.spans[-1].end, len(msg))) 507 508 if plain_indices: 509 for left, right in plain_indices: 510 msg.stylize(style, left, right) 511 else: 512 msg = Text(str(msg), style) 513 514 return rich_text_to_str(msg)
Fill in non-formatted segments of ANSI text.
Parameters
- string (str): A string which contains ANSI escape codes.
- style (str):
Style arguments to pass to
rich.text.Text
.
Returns
- A string with ANSI styling applied to the segments which don't yet have a style applied.
285def format_success_tuple( 286 tup: mrsm.SuccessTuple, 287 upper_padding: int = 0, 288 lower_padding: int = 0, 289 left_padding: int = 1, 290 calm: bool = False, 291 _progress: Optional['rich.progress.Progress'] = None, 292) -> str: 293 """ 294 Format `meerschaum.utils.typing.SuccessTuple`. 295 296 Parameters 297 ---------- 298 upper_padding: int, default 0 299 How many newlines to prepend to the message. 300 301 lower_padding: int, default 0 302 How many newlines to append to the message. 303 304 left_padding: int, default 1 305 How mant spaces to preprend to the message. 306 307 calm: bool, default False 308 If `True`, use the default emoji and color scheme. 309 """ 310 from meerschaum.config.static import STATIC_CONFIG 311 _init() 312 try: 313 status = 'success' if tup[0] else 'failure' 314 except TypeError: 315 status = 'failure' 316 tup = None, None 317 318 if calm: 319 status += '_calm' 320 321 ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET') 322 from meerschaum.config import get_config 323 status_config = get_config('formatting', status, patch=True) 324 325 msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(tup[1]) 326 lines = msg.split('\n') 327 lines = [lines[0]] + [ 328 ((' ' + line if not line.startswith(' ') else line)) 329 for line in lines[1:] 330 ] 331 if ANSI: 332 lines[0] = fill_ansi(highlight_pipes(lines[0]), **status_config['ansi']['rich']) 333 334 msg = '\n'.join(lines) 335 msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding) 336 return msg
Format meerschaum.SuccessTuple
.
Parameters
- upper_padding (int, default 0): How many newlines to prepend to the message.
- lower_padding (int, default 0): How many newlines to append to the message.
- left_padding (int, default 1): How mant spaces to preprend to the message.
- calm (bool, default False):
If
True
, use the default emoji and color scheme.
214def get_console(): 215 """Get the rich console.""" 216 global console 217 from meerschaum.utils.packages import import_rich, attempt_import 218 rich = import_rich() 219 rich_console = attempt_import('rich.console') 220 try: 221 console = rich_console.Console(force_terminal=True, color_system='truecolor') 222 except Exception as e: 223 console = None 224 return console
Get the rich console.
330def highlight_pipes(message: str) -> str: 331 """ 332 Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects. 333 """ 334 if 'Pipe(' not in message: 335 return message 336 337 from meerschaum import Pipe 338 segments = message.split('Pipe(') 339 msg = '' 340 _d = {} 341 for i, segment in enumerate(segments): 342 comma_index = segment.find(',') 343 paren_index = segment.find(')') 344 single_quote_index = segment.find("'") 345 double_quote_index = segment.find('"') 346 347 has_comma = comma_index != -1 348 has_paren = paren_index != -1 349 has_single_quote = single_quote_index != -1 350 has_double_quote = double_quote_index != -1 351 has_quote = has_single_quote or has_double_quote 352 quote_index = ( 353 min(single_quote_index, double_quote_index) 354 if has_double_quote and has_single_quote 355 else (single_quote_index if has_single_quote else double_quote_index) 356 ) 357 358 has_pipe = ( 359 has_comma 360 and 361 has_paren 362 and 363 has_quote 364 and not 365 (comma_index > paren_index or quote_index > paren_index) 366 ) 367 368 if has_pipe: 369 code = "_d['pipe'] = Pipe(" + segment[:paren_index + 1] 370 try: 371 exec(code) 372 _to_add = pipe_repr(_d['pipe']) + segment[paren_index + 1:] 373 _ = _d.pop('pipe', None) 374 except Exception as e: 375 _to_add = 'Pipe(' + segment 376 msg += _to_add 377 continue 378 msg += segment 379 return msg
Add syntax highlighting to an info message containing stringified meerschaum.Pipe
objects.
14def make_header( 15 message : str, 16 ruler : str = '─', 17 ) -> str: 18 """Format a message string with a ruler. 19 Length of the ruler is the length of the longest word. 20 21 Example: 22 'My\nheader' -> 'My\nheader\n──────' 23 """ 24 25 from meerschaum.utils.formatting import ANSI, UNICODE, colored 26 if not UNICODE: 27 ruler = '-' 28 words = message.split('\n') 29 max_length = 0 30 for w in words: 31 length = len(w) 32 if length > max_length: 33 max_length = length 34 35 s = message + "\n" 36 for i in range(max_length): 37 s += ruler 38 return s
Format a message string with a ruler. Length of the ruler is the length of the longest word.
Example:
'My
header' -> 'My header ──────'
279def pipe_repr( 280 pipe: mrsm.Pipe, 281 as_rich_text: bool = False, 282 ansi: Optional[bool] = None, 283 ) -> Union[str, 'rich.text.Text']: 284 """ 285 Return a formatted string for representing a `meerschaum.Pipe`. 286 """ 287 from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, colored, rich_text_to_str 288 from meerschaum.utils.packages import import_rich, attempt_import 289 rich = import_rich() 290 Text = attempt_import('rich.text').Text 291 292 styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles') 293 if not ANSI or (ansi is False): 294 styles = {k: '' for k in styles} 295 _pipe_style_prefix, _pipe_style_suffix = ( 296 (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe'] 297 else ('', '') 298 ) 299 text_obj = ( 300 Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix) 301 + colored(("'" + pipe.connector_keys + "'"), style=styles['connector'], as_rich_text=True) 302 + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix) 303 + colored(("'" + pipe.metric_key + "'"), style=styles['metric'], as_rich_text=True) 304 + ( 305 ( 306 colored(', ', style=styles['punctuation'], as_rich_text=True) 307 + colored( 308 ("'" + pipe.location_key + "'"), 309 style=styles['location'], as_rich_text=True 310 ) 311 ) if pipe.location_key is not None 312 else colored('', style='', as_rich_text=True) 313 ) + ( 314 ( ### Add the `instance=` argument. 315 colored(', instance=', style=styles['punctuation'], as_rich_text=True) 316 + colored( 317 ("'" + pipe.instance_keys + "'"), 318 style=styles['instance'], as_rich_text=True 319 ) 320 ) if pipe.instance_keys != get_config('meerschaum', 'instance') 321 else colored('', style='', as_rich_text=True) 322 ) 323 + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix) 324 ) 325 if as_rich_text: 326 return text_obj 327 return rich_text_to_str(text_obj)
Return a formatted string for representing a meerschaum.Pipe
.
10def pprint( 11 *args, 12 detect_password: bool = True, 13 nopretty: bool = False, 14 **kw 15 ) -> None: 16 """Pretty print an object according to the configured ANSI and UNICODE settings. 17 If detect_password is True (default), search and replace passwords with '*' characters. 18 Does not mutate objects. 19 """ 20 from meerschaum.utils.packages import attempt_import, import_rich 21 from meerschaum.utils.formatting import ANSI, UNICODE, get_console, print_tuple 22 from meerschaum.utils.warnings import error 23 from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords 24 from collections import OrderedDict 25 import copy, json 26 27 if ( 28 len(args) == 1 29 and 30 isinstance(args[0], tuple) 31 and 32 len(args[0]) == 2 33 and 34 isinstance(args[0][0], bool) 35 and 36 isinstance(args[0][1], str) 37 ): 38 return print_tuple(args[0]) 39 40 modify = True 41 rich_pprint = None 42 if ANSI and not nopretty: 43 rich = import_rich() 44 if rich is not None: 45 rich_pretty = attempt_import('rich.pretty') 46 if rich_pretty is not None: 47 def _rich_pprint(*args, **kw): 48 _console = get_console() 49 _kw = filter_keywords(_console.print, **kw) 50 _console.print(*args, **_kw) 51 rich_pprint = _rich_pprint 52 elif not nopretty: 53 pprintpp = attempt_import('pprintpp', warn=False) 54 try: 55 _pprint = pprintpp.pprint 56 except Exception as e: 57 import pprint as _pprint_module 58 _pprint = _pprint_module.pprint 59 60 func = ( 61 _pprint if rich_pprint is None else rich_pprint 62 ) if not nopretty else print 63 64 try: 65 args_copy = copy.deepcopy(args) 66 except Exception as e: 67 args_copy = args 68 modify = False 69 _args = [] 70 for a in args: 71 c = a 72 ### convert OrderedDict into dict 73 if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict): 74 c = dict_from_od(copy.deepcopy(c)) 75 _args.append(c) 76 args = _args 77 78 _args = list(args) 79 if detect_password and modify: 80 _args = [] 81 for a in args: 82 c = a 83 if isinstance(c, dict): 84 c = replace_password(copy.deepcopy(c)) 85 if nopretty: 86 try: 87 c = json.dumps(c) 88 is_json = True 89 except Exception as e: 90 is_json = False 91 if not is_json: 92 try: 93 c = str(c) 94 except Exception as e: 95 pass 96 _args.append(c) 97 98 ### filter out unsupported keywords 99 func_kw = filter_keywords(func, **kw) if not nopretty else {} 100 error_msg = None 101 try: 102 func(*_args, **func_kw) 103 except Exception as e: 104 error_msg = e 105 if error_msg is not None: 106 error(error_msg)
Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.
16def pprint_pipes(pipes: PipesDict) -> None: 17 """Print a stylized tree of a Pipes dictionary. 18 Supports ANSI and UNICODE global settings.""" 19 from meerschaum.utils.warnings import error 20 from meerschaum.utils.packages import attempt_import, import_rich 21 from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict 22 from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console 23 import copy 24 rich = import_rich('rich', warn=False) 25 Text = None 26 if rich is not None: 27 rich_text = attempt_import('rich.text', lazy=False) 28 Text = rich_text.Text 29 30 icons = get_config('formatting', 'pipes', CHARSET, 'icons') 31 styles = get_config('formatting', 'pipes', 'ansi', 'styles') 32 if not ANSI: 33 styles = {k: '' for k in styles} 34 print() 35 36 def ascii_print_pipes(): 37 """Print the dictionary with no unicode allowed. Also works in case rich fails to import 38 (though rich should auto-install when `attempt_import()` is called).""" 39 asciitree = attempt_import('asciitree') 40 ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}} 41 for conn_keys, metrics in pipes.items(): 42 _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector']) 43 if Text is not None: 44 replace_dict['connector'][_colored_conn_key] = ( 45 Text(conn_keys, style=styles['connector']) 46 ) 47 ascii_dict[_colored_conn_key] = {} 48 for metric, locations in metrics.items(): 49 _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric']) 50 if Text is not None: 51 replace_dict['metric'][_colored_metric_key] = ( 52 Text(metric, style=styles['metric']) 53 ) 54 ascii_dict[_colored_conn_key][_colored_metric_key] = {} 55 for location, pipe in locations.items(): 56 _location_style = styles[('none' if location is None else 'location')] 57 pipe_addendum = '\n ' + pipe.__repr__() + '\n' 58 _colored_location = colored( 59 icons['location'] + str(location), style=_location_style 60 ) 61 _colored_location_key = _colored_location + pipe_addendum 62 if Text is not None: 63 replace_dict['location'][_colored_location] = ( 64 Text(str(location), style=_location_style) 65 ) 66 ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {} 67 68 tree = asciitree.LeftAligned() 69 output = '' 70 cols = [] 71 72 ### This is pretty terrible, unreadable code. 73 ### Please know that I'm normally better than this. 74 key_str = ( 75 (Text(" ") if Text is not None else " ") + 76 ( 77 Text("Key", style='underline') if Text is not None else 78 colored("Key", style='underline') 79 ) + (Text('\n\n ') if Text is not None else '\n\n ') + 80 ( 81 Text("Connector", style=styles['connector']) if Text is not None else 82 colored("Connector", style=styles['connector']) 83 ) + (Text('\n +-- ') if Text is not None else '\n +-- ') + 84 ( 85 Text("Metric", style=styles['metric']) if Text is not None else 86 colored("Metric", style=styles['metric']) 87 ) + (Text('\n +-- ') if Text is not None else '\n +-- ') + 88 ( 89 Text("Location", style=styles['location']) if Text is not None else 90 colored("Location", style=styles['location']) 91 ) + (Text('\n\n') if Text is not None else '\n\n') 92 ) 93 94 output += str(key_str) 95 cols.append(key_str) 96 97 def replace_tree_text(tree_str : str) -> Text: 98 """Replace the colored words with stylized Text instead. 99 Is not executed if ANSI and UNICODE are disabled.""" 100 tree_text = Text(tree_str) if Text is not None else None 101 for k, v in replace_dict.items(): 102 for _colored, _text in v.items(): 103 parts = [] 104 lines = tree_text.split(_colored) 105 for part in lines: 106 parts += [part, _text] 107 if lines[-1] != Text(''): 108 parts = parts[:-1] 109 _tree_text = Text('') 110 for part in parts: 111 _tree_text += part 112 tree_text = _tree_text 113 return tree_text 114 115 tree_output = "" 116 for k, v in ascii_dict.items(): 117 branch = {k : v} 118 tree_output += tree(branch) + '\n\n' 119 if not UNICODE and not ANSI: 120 _col = (Text(tree(branch)) if Text is not None else tree(branch)) 121 else: 122 _col = replace_tree_text(tree(branch)) 123 cols.append(_col) 124 if len(output) > 0: 125 tree_output = tree_output[:-2] 126 output += tree_output 127 128 if rich is None: 129 return print(output) 130 131 rich_columns = attempt_import('rich.columns') 132 Columns = rich_columns.Columns 133 columns = Columns(cols) 134 get_console().print(columns) 135 136 if not UNICODE: 137 return ascii_print_pipes() 138 139 rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import( 140 'rich.panel', 141 'rich.tree', 142 'rich.text', 143 'rich.columns', 144 'rich.table', 145 ) 146 from rich import box 147 Panel = rich_panel.Panel 148 Tree = rich_tree.Tree 149 Text = rich_text.Text 150 Columns = rich_columns.Columns 151 Table = rich_table.Table 152 153 key_panel = Panel( 154 ( 155 Text("\n") + 156 Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") + 157 Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") + 158 Text(icons['location'] + "Location", style=styles['location']) + Text("\n") 159 ), 160 title = Text(icons['key'] + "Keys", style=styles['guide']), 161 border_style = styles['guide'], 162 expand = True 163 ) 164 165 cols = [] 166 conn_trees = {} 167 metric_trees = {} 168 pipes = sorted_dict(pipes) 169 for conn_keys, metrics in pipes.items(): 170 conn_trees[conn_keys] = Tree( 171 Text( 172 icons['connector'] + conn_keys, 173 style = styles['connector'], 174 ), 175 guide_style = styles['connector'] 176 ) 177 metric_trees[conn_keys] = {} 178 for metric, locations in metrics.items(): 179 metric_trees[conn_keys][metric] = Tree( 180 Text( 181 icons['metric'] + metric, 182 style = styles['metric'] 183 ), 184 guide_style = styles['metric'] 185 ) 186 conn_trees[conn_keys].add(metric_trees[conn_keys][metric]) 187 for location, pipe in locations.items(): 188 _location = ( 189 Text(str(location), style=styles['none']) if location is None 190 else Text(location, style=styles['location']) 191 ) 192 _location = ( 193 Text(icons['location']) 194 + _location + Text('\n') 195 + pipe_repr(pipe, as_rich_text=True) + Text('\n') 196 ) 197 metric_trees[conn_keys][metric].add(_location) 198 199 cols += [key_panel] 200 for k, t in conn_trees.items(): 201 cols.append(t) 202 203 columns = Columns(cols) 204 get_console().print(columns)
Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.
339def print_options( 340 options: Optional[Dict[str, Any]] = None, 341 nopretty: bool = False, 342 no_rich: bool = False, 343 name: str = 'options', 344 header: Optional[str] = None, 345 num_cols: Optional[int] = None, 346 adjust_cols: bool = True, 347 sort_options: bool = False, 348 **kw 349) -> None: 350 """ 351 Print items in an iterable as a fancy table. 352 353 Parameters 354 ---------- 355 options: Optional[Dict[str, Any]], default None 356 The iterable to be printed. 357 358 nopretty: bool, default False 359 If `True`, don't use fancy formatting. 360 361 no_rich: bool, default False 362 If `True`, don't use `rich` to format the output. 363 364 name: str, default 'options' 365 The text in the default header after `'Available'`. 366 367 header: Optional[str], default None 368 If provided, override `name` and use this as the header text. 369 370 num_cols: Optional[int], default None 371 How many columns in the table. Depends on the terminal size. If `None`, use 8. 372 373 adjust_cols: bool, default True 374 If `True`, adjust the number of columns depending on the terminal size. 375 376 """ 377 import os 378 from meerschaum.utils.packages import import_rich 379 from meerschaum.utils.formatting import make_header, highlight_pipes 380 from meerschaum.actions import actions as _actions 381 from meerschaum.utils.misc import get_cols_lines, string_width, iterate_chunks 382 383 384 if options is None: 385 options = {} 386 _options = [] 387 for o in options: 388 _options.append(str(o)) 389 if sort_options: 390 _options = sorted(_options) 391 _header = f"\nAvailable {name}" if header is None else header 392 393 if num_cols is None: 394 num_cols = 8 395 396 def _print_options_no_rich(): 397 if not nopretty: 398 print() 399 print(make_header(_header)) 400 ### print actions 401 for option in _options: 402 if not nopretty: 403 print(" - ", end="") 404 print(option) 405 if not nopretty: 406 print() 407 408 rich = import_rich() 409 if rich is None or nopretty or no_rich: 410 _print_options_no_rich() 411 return None 412 413 ### Prevent too many options from being truncated on small terminals. 414 if adjust_cols and _options: 415 _cols, _lines = get_cols_lines() 416 while num_cols > 1: 417 cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols) 418 num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options]) 419 if num_too_big > int(len(_options) / 3): 420 num_cols -= 1 421 continue 422 break 423 424 from meerschaum.utils.formatting import pprint, get_console 425 from meerschaum.utils.packages import attempt_import 426 rich_columns = attempt_import('rich.columns') 427 rich_panel = attempt_import('rich.panel') 428 rich_table = attempt_import('rich.table') 429 Text = attempt_import('rich.text').Text 430 box = attempt_import('rich.box') 431 Panel = rich_panel.Panel 432 Columns = rich_columns.Columns 433 Table = rich_table.Table 434 435 if _header is not None: 436 table = Table( 437 title = _header, 438 box = box.SIMPLE, 439 show_header = False, 440 show_footer = False, 441 title_style = '', 442 expand = True, 443 ) 444 else: 445 table = Table.grid(padding=(0, 2)) 446 for i in range(num_cols): 447 table.add_column() 448 449 if len(_options) < 12: 450 # If fewer than 12 items, use a single column 451 for option in _options: 452 table.add_row(Text.from_ansi(highlight_pipes(option))) 453 else: 454 # Otherwise, use multiple columns as before 455 num_rows = (len(_options) + num_cols - 1) // num_cols 456 for i in range(num_rows): 457 row = [] 458 for j in range(num_cols): 459 index = i + j * num_rows 460 if index < len(_options): 461 row.append(Text.from_ansi(highlight_pipes(_options[index]))) 462 else: 463 row.append('') 464 table.add_row(*row) 465 466 get_console().print(table) 467 return None
Print items in an iterable as a fancy table.
Parameters
- options (Optional[Dict[str, Any]], default None): The iterable to be printed.
- nopretty (bool, default False):
If
True
, don't use fancy formatting. - no_rich (bool, default False):
If
True
, don't userich
to format the output. - name (str, default 'options'):
The text in the default header after
'Available'
. - header (Optional[str], default None):
If provided, override
name
and use this as the header text. - num_cols (Optional[int], default None):
How many columns in the table. Depends on the terminal size. If
None
, use 8. - adjust_cols (bool, default True):
If
True
, adjust the number of columns depending on the terminal size.
431def print_pipes_results( 432 pipes_results: Dict[mrsm.Pipe, SuccessTuple], 433 success_header: Optional[str] = 'Successes', 434 failure_header: Optional[str] = 'Failures', 435 nopretty: bool = False, 436 **kwargs: Any 437 ) -> None: 438 """ 439 Print the pipes and their result SuccessTuples. 440 441 Parameters 442 ---------- 443 pipes_results: Dict[mrsm.Pipe, SuccessTuple] 444 A dictionary mapping pipes to their resulting SuccessTuples. 445 446 success_header: Optional[str], default 'Successes' 447 The header to print above the successful pipes. 448 449 failure_header: Optional[str], default 'Fails' 450 The header to print above the failed pipes. 451 452 kwargs: Any 453 All other keyword arguments are passed to `meerschaum.utils.misc.print_options`. 454 """ 455 from meerschaum.utils.misc import print_options 456 successes = [pipe for pipe, (success, msg) in pipes_results.items() if success] 457 fails = [pipe for pipe, (success, msg) in pipes_results.items() if success] 458 success_options = [ 459 format_pipe_success_tuple(pipe, success_tuple, nopretty=nopretty) 460 for pipe, success_tuple in pipes_results.items() 461 if success_tuple[0] 462 ] 463 failure_options = [ 464 format_pipe_success_tuple(pipe, success_tuple, nopretty=nopretty) 465 for pipe, success_tuple in pipes_results.items() 466 if not success_tuple[0] 467 ] 468 469 if success_options: 470 print_options( 471 success_options, 472 header = success_header, 473 nopretty = nopretty, 474 **kwargs 475 ) 476 if failure_options: 477 print_options( 478 failure_options, 479 header = failure_header, 480 nopretty = nopretty, 481 **kwargs 482 )
Print the pipes and their result SuccessTuples.
Parameters
- pipes_results (Dict[mrsm.Pipe, SuccessTuple]): A dictionary mapping pipes to their resulting SuccessTuples.
- success_header (Optional[str], default 'Successes'): The header to print above the successful pipes.
- failure_header (Optional[str], default 'Fails'): The header to print above the failed pipes.
- kwargs (Any):
All other keyword arguments are passed to
meerschaum.utils.misc.print_options
.
227def print_tuple( 228 tup: mrsm.SuccessTuple, 229 skip_common: bool = True, 230 common_only: bool = False, 231 upper_padding: int = 0, 232 lower_padding: int = 0, 233 left_padding: int = 1, 234 calm: bool = False, 235 _progress: Optional['rich.progress.Progress'] = None, 236) -> None: 237 """ 238 Format `meerschaum.utils.typing.SuccessTuple`. 239 240 Parameters 241 ---------- 242 skip_common: bool, default True 243 If `True`, do not print common success tuples (i.e. `(True, "Success")`). 244 245 common_only: bool, default False 246 If `True`, only print if the success tuple is common. 247 248 upper_padding: int, default 0 249 How many newlines to prepend to the message. 250 251 lower_padding: int, default 0 252 How many newlines to append to the message. 253 254 left_padding: int, default 1 255 How mant spaces to preprend to the message. 256 257 calm: bool, default False 258 If `True`, use the default emoji and color scheme. 259 260 """ 261 from meerschaum.config.static import STATIC_CONFIG 262 do_print = True 263 264 omit_messages = STATIC_CONFIG['system']['success']['ignore'] 265 266 if common_only: 267 skip_common = False 268 do_print = tup[1] in omit_messages 269 270 if skip_common: 271 do_print = tup[1] not in omit_messages 272 273 if not do_print: 274 return 275 276 print(format_success_tuple( 277 tup, 278 upper_padding=upper_padding, 279 lower_padding=lower_padding, 280 calm=calm, 281 _progress=_progress, 282 ))
Format meerschaum.SuccessTuple
.
Parameters
- skip_common (bool, default True):
If
True
, do not print common success tuples (i.e.(True, "Success")
). - common_only (bool, default False):
If
True
, only print if the success tuple is common. - upper_padding (int, default 0): How many newlines to prepend to the message.
- lower_padding (int, default 0): How many newlines to append to the message.
- left_padding (int, default 1): How mant spaces to preprend to the message.
- calm (bool, default False):
If
True
, use the default emoji and color scheme.
59def translate_rich_to_termcolor(*colors) -> tuple: 60 """Translate between rich and more_termcolor terminology. 61 This is probably prone to breaking. 62 63 Parameters 64 ---------- 65 *colors : 66 67 68 Returns 69 ------- 70 71 """ 72 _colors = [] 73 for c in colors: 74 _c_list = [] 75 ### handle 'bright' 76 c = c.replace('bright_', 'bright ') 77 78 ### handle 'on' 79 if ' on ' in c: 80 _on = c.split(' on ') 81 _colors.append(_on[0]) 82 for _c in _on[1:]: 83 _c_list.append('on ' + _c) 84 else: 85 _c_list += [c] 86 87 _colors += _c_list 88 89 return tuple(_colors)
Translate between rich and more_termcolor terminology. This is probably prone to breaking.
Parameters
- *colors :
- Returns
- -------