meerschaum.utils.formatting
Utilities for formatting output text
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utilities for formatting output text 7""" 8 9from __future__ import annotations 10import platform 11import os 12import sys 13import meerschaum as mrsm 14from meerschaum.utils.typing import Optional, Union, Any, Dict, Iterable 15from meerschaum.utils.formatting._shell import make_header 16from meerschaum.utils.formatting._pprint import pprint 17from meerschaum.utils.formatting._pipes import ( 18 pprint_pipes, 19 highlight_pipes, 20 format_pipe_success_tuple, 21 print_pipes_results, 22 extract_stats_from_message, 23 pipe_repr, 24) 25from meerschaum.utils.threading import Lock, RLock 26 27_attrs = { 28 'ANSI': None, 29 'UNICODE': None, 30 'CHARSET': None, 31 'RESET': '\033[0m', 32} 33__all__ = sorted([ 34 'ANSI', 'CHARSET', 'UNICODE', 'RESET', 35 'colored', 36 'translate_rich_to_termcolor', 37 'get_console', 38 'format_success_tuple', 39 'print_tuple', 40 'print_options', 41 'fill_ansi', 42 'pprint', 43 'highlight_pipes', 44 'pprint_pipes', 45 'make_header', 46 'pipe_repr', 47 'print_pipes_results', 48 'extract_stats_from_message', 49]) 50__pdoc__ = {} 51_locks = { 52 '_colorama_init': RLock(), 53} 54 55 56def colored_fallback(*args, **kw): 57 return ' '.join(args) 58 59def translate_rich_to_termcolor(*colors) -> tuple: 60 """Translate between rich and more_termcolor terminology. 61 This is probably prone to breaking. 62 63 Parameters 64 ---------- 65 *colors : 66 67 68 Returns 69 ------- 70 71 """ 72 _colors = [] 73 for c in colors: 74 _c_list = [] 75 ### handle 'bright' 76 c = c.replace('bright_', 'bright ') 77 78 ### handle 'on' 79 if ' on ' in c: 80 _on = c.split(' on ') 81 _colors.append(_on[0]) 82 for _c in _on[1:]: 83 _c_list.append('on ' + _c) 84 else: 85 _c_list += [c] 86 87 _colors += _c_list 88 89 return tuple(_colors) 90 91 92def rich_text_to_str(text: 'rich.text.Text') -> str: 93 """Convert a `rich.text.Text` object to a string with ANSI in-tact.""" 94 _console = get_console() 95 if _console is None: 96 return str(text) 97 with console.capture() as cap: 98 console.print(text) 99 string = cap.get() 100 return string[:-1] 101 102 103def _init(): 104 """ 105 Initial color settings (mostly for Windows). 106 """ 107 if platform.system() != "Windows": 108 return 109 if 'PYTHONIOENCODING' not in os.environ: 110 os.environ['PYTHONIOENCODING'] = 'utf-8' 111 if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ: 112 os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8' 113 sys.stdin.reconfigure(encoding='utf-8') 114 sys.stdout.reconfigure(encoding='utf-8') 115 sys.stderr.reconfigure(encoding='utf-8') 116 117 from ctypes import windll 118 k = windll.kernel32 119 k.SetConsoleMode(k.GetStdHandle(-11), 7) 120 os.system("color") 121 122 from meerschaum.utils.packages import attempt_import 123 ### init colorama for Windows color output 124 colorama, more_termcolor = attempt_import( 125 'colorama', 126 'more_termcolor', 127 lazy = False, 128 warn = False, 129 color = False, 130 ) 131 try: 132 colorama.init(autoreset=False) 133 success = True 134 except Exception as e: 135 import traceback 136 traceback.print_exc() 137 _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii' 138 success = False 139 140 if more_termcolor is None: 141 _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii' 142 success = False 143 144 return success 145 146_colorama_init = False 147def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']: 148 """Apply colors and rich styles to a string. 149 If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string. 150 Otherwise attempt to use the legacy `more_termcolor.colored` method. 151 152 Parameters 153 ---------- 154 text: str 155 The string to apply formatting to. 156 157 *colors: 158 A list of colors to pass to `more_termcolor.colored()`. 159 Has no effect if `style` is provided. 160 161 style: str, default None 162 If provided, pass to `rich` for processing. 163 164 as_rich_text: bool, default False 165 If `True`, return a `rich.Text` object. 166 `style` must be provided. 167 168 **kw: 169 Keyword arguments to pass to `rich.text.Text` or `more_termcolor`. 170 171 172 Returns 173 ------- 174 An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`. 175 176 """ 177 from meerschaum.utils.packages import import_rich, attempt_import 178 global _colorama_init 179 _init() 180 with _locks['_colorama_init']: 181 if not _colorama_init: 182 _colorama_init = _init() 183 184 if 'style' in kw: 185 rich = import_rich() 186 rich_text = attempt_import('rich.text') 187 text_obj = rich_text.Text(text, **kw) 188 if as_rich_text: 189 return text_obj 190 return rich_text_to_str(text_obj) 191 192 more_termcolor = attempt_import('more_termcolor', lazy=False) 193 try: 194 colored_text = more_termcolor.colored(text, *colors, **kw) 195 except Exception as e: 196 colored_text = None 197 198 if colored_text is not None: 199 return colored_text 200 201 try: 202 _colors = translate_rich_to_termcolor(*colors) 203 colored_text = more_termcolor.colored(text, *_colors, **kw) 204 except Exception as e: 205 colored_text = None 206 207 if colored_text is None: 208 ### NOTE: warn here? 209 return text 210 211 return colored_text 212 213console = None 214def get_console(): 215 """Get the rich console.""" 216 global console 217 from meerschaum.utils.packages import import_rich, attempt_import 218 rich = import_rich() 219 rich_console = attempt_import('rich.console') 220 try: 221 console = rich_console.Console(force_terminal=True, color_system='truecolor') 222 except Exception as e: 223 console = None 224 return console 225 226 227def print_tuple( 228 tup: mrsm.SuccessTuple, 229 skip_common: bool = True, 230 common_only: bool = False, 231 upper_padding: int = 0, 232 lower_padding: int = 0, 233 left_padding: int = 1, 234 calm: bool = False, 235 _progress: Optional['rich.progress.Progress'] = None, 236) -> None: 237 """ 238 Format `meerschaum.utils.typing.SuccessTuple`. 239 240 Parameters 241 ---------- 242 skip_common: bool, default True 243 If `True`, do not print common success tuples (i.e. `(True, "Success")`). 244 245 common_only: bool, default False 246 If `True`, only print if the success tuple is common. 247 248 upper_padding: int, default 0 249 How many newlines to prepend to the message. 250 251 lower_padding: int, default 0 252 How many newlines to append to the message. 253 254 left_padding: int, default 1 255 How mant spaces to preprend to the message. 256 257 calm: bool, default False 258 If `True`, use the default emoji and color scheme. 259 260 """ 261 from meerschaum.config.static import STATIC_CONFIG 262 do_print = True 263 264 omit_messages = STATIC_CONFIG['system']['success']['ignore'] 265 266 if common_only: 267 skip_common = False 268 do_print = tup[1].strip() in omit_messages 269 270 if skip_common: 271 do_print = tup[1].strip() not in omit_messages 272 273 if not do_print: 274 return 275 276 print(format_success_tuple( 277 tup, 278 upper_padding=upper_padding, 279 lower_padding=lower_padding, 280 calm=calm, 281 _progress=_progress, 282 )) 283 284 285def format_success_tuple( 286 tup: mrsm.SuccessTuple, 287 upper_padding: int = 0, 288 lower_padding: int = 0, 289 left_padding: int = 1, 290 calm: bool = False, 291 _progress: Optional['rich.progress.Progress'] = None, 292) -> str: 293 """ 294 Format `meerschaum.utils.typing.SuccessTuple`. 295 296 Parameters 297 ---------- 298 upper_padding: int, default 0 299 How many newlines to prepend to the message. 300 301 lower_padding: int, default 0 302 How many newlines to append to the message. 303 304 left_padding: int, default 1 305 How mant spaces to preprend to the message. 306 307 calm: bool, default False 308 If `True`, use the default emoji and color scheme. 309 """ 310 from meerschaum.config.static import STATIC_CONFIG 311 _init() 312 try: 313 status = 'success' if tup[0] else 'failure' 314 except TypeError: 315 status = 'failure' 316 tup = None, None 317 318 if calm: 319 status += '_calm' 320 321 ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET') 322 from meerschaum.config import get_config 323 status_config = get_config('formatting', status, patch=True) 324 325 msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1])) 326 lines = msg.split('\n') 327 lines = [lines[0]] + [ 328 ((' ' + line if not line.startswith(' ') else line)) 329 for line in lines[1:] 330 ] 331 if ANSI: 332 lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich']) 333 334 msg = '\n'.join(lines) 335 msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding) 336 return msg 337 338 339def print_options( 340 options: Optional[Iterable[Any]] = None, 341 nopretty: bool = False, 342 no_rich: bool = False, 343 name: str = 'options', 344 header: Optional[str] = None, 345 num_cols: Optional[int] = None, 346 adjust_cols: bool = True, 347 sort_options: bool = False, 348 number_options: bool = False, 349 **kw 350) -> None: 351 """ 352 Print items in an iterable as a fancy table. 353 354 Parameters 355 ---------- 356 options: Optional[Dict[str, Any]], default None 357 The iterable to be printed. 358 359 nopretty: bool, default False 360 If `True`, don't use fancy formatting. 361 362 no_rich: bool, default False 363 If `True`, don't use `rich` to format the output. 364 365 name: str, default 'options' 366 The text in the default header after `'Available'`. 367 368 header: Optional[str], default None 369 If provided, override `name` and use this as the header text. 370 371 num_cols: Optional[int], default None 372 How many columns in the table. Depends on the terminal size. If `None`, use 8. 373 374 adjust_cols: bool, default True 375 If `True`, adjust the number of columns depending on the terminal size. 376 377 sort_options: bool, default False 378 If `True`, print the options in sorted order. 379 380 number_options: bool, default False 381 If `True`, print the option's number in the list (1 index). 382 383 """ 384 import os 385 from meerschaum.utils.packages import import_rich 386 from meerschaum.utils.formatting import make_header, highlight_pipes 387 from meerschaum.actions import actions as _actions 388 from meerschaum.utils.misc import get_cols_lines, string_width, iterate_chunks 389 390 391 if options is None: 392 options = {} 393 _options = [] 394 for o in options: 395 _options.append(str(o)) 396 if sort_options: 397 _options = sorted(_options) 398 _header = f"\nAvailable {name}" if header is None else header 399 400 if num_cols is None: 401 num_cols = 8 402 403 def _print_options_no_rich(): 404 if not nopretty: 405 print() 406 print(make_header(_header)) 407 ### print actions 408 for i, option in enumerate(_options): 409 marker = '-' if not number_options else (str(i + 1) + '.') 410 if not nopretty: 411 print(f" {marker} ", end="") 412 print(option) 413 if not nopretty: 414 print() 415 416 rich = import_rich() 417 if rich is None or nopretty or no_rich: 418 _print_options_no_rich() 419 return None 420 421 ### Prevent too many options from being truncated on small terminals. 422 if adjust_cols and _options: 423 _cols, _lines = get_cols_lines() 424 while num_cols > 1: 425 cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols) 426 num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options]) 427 if num_too_big > int(len(_options) / 3): 428 num_cols -= 1 429 continue 430 break 431 432 from meerschaum.utils.formatting import pprint, get_console 433 from meerschaum.utils.packages import attempt_import 434 rich_columns = attempt_import('rich.columns') 435 rich_panel = attempt_import('rich.panel') 436 rich_table = attempt_import('rich.table') 437 Text = attempt_import('rich.text').Text 438 box = attempt_import('rich.box') 439 Panel = rich_panel.Panel 440 Columns = rich_columns.Columns 441 Table = rich_table.Table 442 443 if _header is not None: 444 table = Table( 445 title=_header, 446 box=box.SIMPLE, 447 show_header=False, 448 show_footer=False, 449 title_style='', 450 expand = True, 451 ) 452 else: 453 table = Table.grid(padding=(0, 2)) 454 for i in range(num_cols): 455 table.add_column() 456 457 if len(_options) < 12: 458 ### If fewer than 12 items, use a single column 459 for i, option in enumerate(_options): 460 item = highlight_pipes(option) 461 if number_options: 462 item = str(i + 1) + '. ' + item 463 table.add_row(Text.from_ansi(item)) 464 else: 465 ### Otherwise, use multiple columns as before 466 num_rows = (len(_options) + num_cols - 1) // num_cols 467 item_ix = 0 468 for i in range(num_rows): 469 row = [] 470 for j in range(num_cols): 471 index = i + j * num_rows 472 if index < len(_options): 473 item = highlight_pipes(_options[index]) 474 if number_options: 475 item = str(i + 1) + '. ' + item 476 row.append(Text.from_ansi(item)) 477 item_ix += 1 478 else: 479 row.append('') 480 table.add_row(*row) 481 482 get_console().print(table) 483 return None 484 485 486def fill_ansi(string: str, style: str = '') -> str: 487 """ 488 Fill in non-formatted segments of ANSI text. 489 490 Parameters 491 ---------- 492 string: str 493 A string which contains ANSI escape codes. 494 495 style: str 496 Style arguments to pass to `rich.text.Text`. 497 498 Returns 499 ------- 500 A string with ANSI styling applied to the segments which don't yet have a style applied. 501 """ 502 from meerschaum.utils.packages import import_rich, attempt_import 503 from meerschaum.utils.misc import iterate_chunks 504 rich = import_rich() 505 Text = attempt_import('rich.text').Text 506 try: 507 msg = Text.from_ansi(string) 508 except AttributeError as e: 509 import traceback 510 traceback.print_stack() 511 msg = '' 512 plain_indices = [] 513 for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)): 514 left = left_span.end 515 right = right_span.start if not isinstance(right_span, int) else right_span 516 if left != right: 517 plain_indices.append((left, right)) 518 if msg.spans: 519 if msg.spans[0].start != 0: 520 plain_indices = [(0, msg.spans[0].start)] + plain_indices 521 if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg): 522 plain_indices.append((msg.spans[-1].end, len(msg))) 523 524 if plain_indices: 525 for left, right in plain_indices: 526 msg.stylize(style, left, right) 527 else: 528 msg = Text(str(msg), style) 529 530 return rich_text_to_str(msg) 531 532 533def __getattr__(name: str) -> str: 534 """ 535 Lazily load module-level variables. 536 """ 537 if name.startswith('__') and name.endswith('__'): 538 raise AttributeError("Cannot import dunders from this module.") 539 540 if name in _attrs: 541 if _attrs[name] is not None: 542 return _attrs[name] 543 from meerschaum.config import get_config 544 if name.lower() in get_config('formatting'): 545 _attrs[name] = get_config('formatting', name.lower()) 546 elif name == 'CHARSET': 547 _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii' 548 return _attrs[name] 549 550 if name == '__wrapped__': 551 import sys 552 return sys.modules[__name__] 553 if name == '__all__': 554 return __all__ 555 556 try: 557 return globals()[name] 558 except KeyError: 559 raise AttributeError(f"Could not find '{name}'")
148def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']: 149 """Apply colors and rich styles to a string. 150 If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string. 151 Otherwise attempt to use the legacy `more_termcolor.colored` method. 152 153 Parameters 154 ---------- 155 text: str 156 The string to apply formatting to. 157 158 *colors: 159 A list of colors to pass to `more_termcolor.colored()`. 160 Has no effect if `style` is provided. 161 162 style: str, default None 163 If provided, pass to `rich` for processing. 164 165 as_rich_text: bool, default False 166 If `True`, return a `rich.Text` object. 167 `style` must be provided. 168 169 **kw: 170 Keyword arguments to pass to `rich.text.Text` or `more_termcolor`. 171 172 173 Returns 174 ------- 175 An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`. 176 177 """ 178 from meerschaum.utils.packages import import_rich, attempt_import 179 global _colorama_init 180 _init() 181 with _locks['_colorama_init']: 182 if not _colorama_init: 183 _colorama_init = _init() 184 185 if 'style' in kw: 186 rich = import_rich() 187 rich_text = attempt_import('rich.text') 188 text_obj = rich_text.Text(text, **kw) 189 if as_rich_text: 190 return text_obj 191 return rich_text_to_str(text_obj) 192 193 more_termcolor = attempt_import('more_termcolor', lazy=False) 194 try: 195 colored_text = more_termcolor.colored(text, *colors, **kw) 196 except Exception as e: 197 colored_text = None 198 199 if colored_text is not None: 200 return colored_text 201 202 try: 203 _colors = translate_rich_to_termcolor(*colors) 204 colored_text = more_termcolor.colored(text, *_colors, **kw) 205 except Exception as e: 206 colored_text = None 207 208 if colored_text is None: 209 ### NOTE: warn here? 210 return text 211 212 return colored_text
Apply colors and rich styles to a string.
If a style
keyword is provided, a rich.text.Text
object will be parsed into a string.
Otherwise attempt to use the legacy more_termcolor.colored
method.
Parameters
- text (str): The string to apply formatting to.
- *colors:: A list of colors to pass to
more_termcolor.colored()
. Has no effect ifstyle
is provided. - style (str, default None):
If provided, pass to
rich
for processing. - as_rich_text (bool, default False):
If
True
, return arich.Text
object.style
must be provided. - **kw:: Keyword arguments to pass to
rich.text.Text
ormore_termcolor
.
Returns
- An ANSI-formatted string or a
rich.text.Text
object ifas_rich_text
isTrue
.
485def extract_stats_from_message( 486 message: str, 487 stat_keys: Optional[List[str]] = None, 488 ) -> Dict[str, int]: 489 """ 490 Given a sync message, return the insert, update, upsert stats from within. 491 492 Parameters 493 ---------- 494 message: str 495 The message to parse for statistics. 496 497 stat_keys: Optional[List[str]], default None 498 If provided, search for these words (case insensitive) in the message. 499 Defaults to `['inserted', 'updated', 'upserted']`. 500 501 Returns 502 ------- 503 A dictionary mapping the stat keys to the total number of rows affected. 504 """ 505 stat_keys = stat_keys or ['inserted', 'updated', 'upserted'] 506 lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')] 507 message_stats = { 508 stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats) 509 for stat_key in stat_keys 510 } 511 return message_stats
Given a sync message, return the insert, update, upsert stats from within.
Parameters
- message (str): The message to parse for statistics.
- stat_keys (Optional[List[str]], default None):
If provided, search for these words (case insensitive) in the message.
Defaults to
['inserted', 'updated', 'upserted']
.
Returns
- A dictionary mapping the stat keys to the total number of rows affected.
487def fill_ansi(string: str, style: str = '') -> str: 488 """ 489 Fill in non-formatted segments of ANSI text. 490 491 Parameters 492 ---------- 493 string: str 494 A string which contains ANSI escape codes. 495 496 style: str 497 Style arguments to pass to `rich.text.Text`. 498 499 Returns 500 ------- 501 A string with ANSI styling applied to the segments which don't yet have a style applied. 502 """ 503 from meerschaum.utils.packages import import_rich, attempt_import 504 from meerschaum.utils.misc import iterate_chunks 505 rich = import_rich() 506 Text = attempt_import('rich.text').Text 507 try: 508 msg = Text.from_ansi(string) 509 except AttributeError as e: 510 import traceback 511 traceback.print_stack() 512 msg = '' 513 plain_indices = [] 514 for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)): 515 left = left_span.end 516 right = right_span.start if not isinstance(right_span, int) else right_span 517 if left != right: 518 plain_indices.append((left, right)) 519 if msg.spans: 520 if msg.spans[0].start != 0: 521 plain_indices = [(0, msg.spans[0].start)] + plain_indices 522 if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg): 523 plain_indices.append((msg.spans[-1].end, len(msg))) 524 525 if plain_indices: 526 for left, right in plain_indices: 527 msg.stylize(style, left, right) 528 else: 529 msg = Text(str(msg), style) 530 531 return rich_text_to_str(msg)
Fill in non-formatted segments of ANSI text.
Parameters
- string (str): A string which contains ANSI escape codes.
- style (str):
Style arguments to pass to
rich.text.Text
.
Returns
- A string with ANSI styling applied to the segments which don't yet have a style applied.
286def format_success_tuple( 287 tup: mrsm.SuccessTuple, 288 upper_padding: int = 0, 289 lower_padding: int = 0, 290 left_padding: int = 1, 291 calm: bool = False, 292 _progress: Optional['rich.progress.Progress'] = None, 293) -> str: 294 """ 295 Format `meerschaum.utils.typing.SuccessTuple`. 296 297 Parameters 298 ---------- 299 upper_padding: int, default 0 300 How many newlines to prepend to the message. 301 302 lower_padding: int, default 0 303 How many newlines to append to the message. 304 305 left_padding: int, default 1 306 How mant spaces to preprend to the message. 307 308 calm: bool, default False 309 If `True`, use the default emoji and color scheme. 310 """ 311 from meerschaum.config.static import STATIC_CONFIG 312 _init() 313 try: 314 status = 'success' if tup[0] else 'failure' 315 except TypeError: 316 status = 'failure' 317 tup = None, None 318 319 if calm: 320 status += '_calm' 321 322 ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET') 323 from meerschaum.config import get_config 324 status_config = get_config('formatting', status, patch=True) 325 326 msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1])) 327 lines = msg.split('\n') 328 lines = [lines[0]] + [ 329 ((' ' + line if not line.startswith(' ') else line)) 330 for line in lines[1:] 331 ] 332 if ANSI: 333 lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich']) 334 335 msg = '\n'.join(lines) 336 msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding) 337 return msg
Format meerschaum.utils.typing.SuccessTuple
.
Parameters
- upper_padding (int, default 0): How many newlines to prepend to the message.
- lower_padding (int, default 0): How many newlines to append to the message.
- left_padding (int, default 1): How mant spaces to preprend to the message.
- calm (bool, default False):
If
True
, use the default emoji and color scheme.
215def get_console(): 216 """Get the rich console.""" 217 global console 218 from meerschaum.utils.packages import import_rich, attempt_import 219 rich = import_rich() 220 rich_console = attempt_import('rich.console') 221 try: 222 console = rich_console.Console(force_terminal=True, color_system='truecolor') 223 except Exception as e: 224 console = None 225 return console
Get the rich console.
330def highlight_pipes(message: str) -> str: 331 """ 332 Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects. 333 """ 334 if 'Pipe(' not in message: 335 return message 336 337 from meerschaum import Pipe 338 segments = message.split('Pipe(') 339 msg = '' 340 _d = {} 341 for i, segment in enumerate(segments): 342 comma_index = segment.find(',') 343 paren_index = segment.find(')') 344 single_quote_index = segment.find("'") 345 double_quote_index = segment.find('"') 346 347 has_comma = comma_index != -1 348 has_paren = paren_index != -1 349 has_single_quote = single_quote_index != -1 350 has_double_quote = double_quote_index != -1 351 has_quote = has_single_quote or has_double_quote 352 quote_index = ( 353 min(single_quote_index, double_quote_index) 354 if has_double_quote and has_single_quote 355 else (single_quote_index if has_single_quote else double_quote_index) 356 ) 357 358 has_pipe = ( 359 has_comma 360 and 361 has_paren 362 and 363 has_quote 364 and not 365 (comma_index > paren_index or quote_index > paren_index) 366 ) 367 368 if has_pipe: 369 code = "_d['pipe'] = Pipe(" + segment[:paren_index + 1] 370 try: 371 exec(code) 372 _to_add = pipe_repr(_d['pipe']) + segment[paren_index + 1:] 373 _ = _d.pop('pipe', None) 374 except Exception as e: 375 _to_add = 'Pipe(' + segment 376 msg += _to_add 377 continue 378 msg += segment 379 return msg
Add syntax highlighting to an info message containing stringified meerschaum.Pipe
objects.
15def make_header( 16 message: str, 17 ruler: str = '─', 18) -> str: 19 """Format a message string with a ruler. 20 Length of the ruler is the length of the longest word. 21 22 Example: 23 'My\nheader' -> 'My\nheader\n──────' 24 """ 25 26 from meerschaum.utils.formatting import ANSI, UNICODE, colored 27 if not UNICODE: 28 ruler = '-' 29 words = message.split('\n') 30 max_length = 0 31 for w in words: 32 length = len(w) 33 if length > max_length: 34 max_length = length 35 36 s = message + "\n" 37 for i in range(max_length): 38 s += ruler 39 return s
Format a message string with a ruler. Length of the ruler is the length of the longest word.
Example:
'My
header' -> 'My header ──────'
279def pipe_repr( 280 pipe: mrsm.Pipe, 281 as_rich_text: bool = False, 282 ansi: Optional[bool] = None, 283) -> Union[str, 'rich.text.Text']: 284 """ 285 Return a formatted string for representing a `meerschaum.Pipe`. 286 """ 287 from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, colored, rich_text_to_str 288 from meerschaum.utils.packages import import_rich, attempt_import 289 rich = import_rich() 290 Text = attempt_import('rich.text').Text 291 292 styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles') 293 if not ANSI or (ansi is False): 294 styles = {k: '' for k in styles} 295 _pipe_style_prefix, _pipe_style_suffix = ( 296 (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe'] 297 else ('', '') 298 ) 299 text_obj = ( 300 Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix) 301 + colored(("'" + pipe.connector_keys + "'"), style=styles['connector'], as_rich_text=True) 302 + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix) 303 + colored(("'" + pipe.metric_key + "'"), style=styles['metric'], as_rich_text=True) 304 + ( 305 ( 306 colored(', ', style=styles['punctuation'], as_rich_text=True) 307 + colored( 308 ("'" + pipe.location_key + "'"), 309 style=styles['location'], as_rich_text=True 310 ) 311 ) if pipe.location_key is not None 312 else colored('', style='', as_rich_text=True) 313 ) + ( 314 ( ### Add the `instance=` argument. 315 colored(', instance=', style=styles['punctuation'], as_rich_text=True) 316 + colored( 317 ("'" + pipe.instance_keys + "'"), 318 style=styles['instance'], as_rich_text=True 319 ) 320 ) if pipe.instance_keys != get_config('meerschaum', 'instance') 321 else colored('', style='', as_rich_text=True) 322 ) 323 + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix) 324 ) 325 if as_rich_text: 326 return text_obj 327 return rich_text_to_str(text_obj).replace('\n', '')
Return a formatted string for representing a meerschaum.Pipe
.
10def pprint( 11 *args, 12 detect_password: bool = True, 13 nopretty: bool = False, 14 **kw 15 ) -> None: 16 """Pretty print an object according to the configured ANSI and UNICODE settings. 17 If detect_password is True (default), search and replace passwords with '*' characters. 18 Does not mutate objects. 19 """ 20 from meerschaum.utils.packages import attempt_import, import_rich 21 from meerschaum.utils.formatting import ANSI, UNICODE, get_console, print_tuple 22 from meerschaum.utils.warnings import error 23 from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords 24 from collections import OrderedDict 25 import copy, json 26 27 if ( 28 len(args) == 1 29 and 30 isinstance(args[0], tuple) 31 and 32 len(args[0]) == 2 33 and 34 isinstance(args[0][0], bool) 35 and 36 isinstance(args[0][1], str) 37 ): 38 return print_tuple(args[0]) 39 40 modify = True 41 rich_pprint = None 42 if ANSI and not nopretty: 43 rich = import_rich() 44 if rich is not None: 45 rich_pretty = attempt_import('rich.pretty') 46 if rich_pretty is not None: 47 def _rich_pprint(*args, **kw): 48 _console = get_console() 49 _kw = filter_keywords(_console.print, **kw) 50 _console.print(*args, **_kw) 51 rich_pprint = _rich_pprint 52 elif not nopretty: 53 pprintpp = attempt_import('pprintpp', warn=False) 54 try: 55 _pprint = pprintpp.pprint 56 except Exception as e: 57 import pprint as _pprint_module 58 _pprint = _pprint_module.pprint 59 60 func = ( 61 _pprint if rich_pprint is None else rich_pprint 62 ) if not nopretty else print 63 64 try: 65 args_copy = copy.deepcopy(args) 66 except Exception as e: 67 args_copy = args 68 modify = False 69 _args = [] 70 for a in args: 71 c = a 72 ### convert OrderedDict into dict 73 if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict): 74 c = dict_from_od(copy.deepcopy(c)) 75 _args.append(c) 76 args = _args 77 78 _args = list(args) 79 if detect_password and modify: 80 _args = [] 81 for a in args: 82 c = a 83 if isinstance(c, dict): 84 c = replace_password(copy.deepcopy(c)) 85 if nopretty: 86 try: 87 c = json.dumps(c) 88 is_json = True 89 except Exception as e: 90 is_json = False 91 if not is_json: 92 try: 93 c = str(c) 94 except Exception as e: 95 pass 96 _args.append(c) 97 98 ### filter out unsupported keywords 99 func_kw = filter_keywords(func, **kw) if not nopretty else {} 100 error_msg = None 101 try: 102 func(*_args, **func_kw) 103 except Exception as e: 104 error_msg = e 105 if error_msg is not None: 106 error(error_msg)
Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.
16def pprint_pipes(pipes: PipesDict) -> None: 17 """Print a stylized tree of a Pipes dictionary. 18 Supports ANSI and UNICODE global settings.""" 19 from meerschaum.utils.warnings import error 20 from meerschaum.utils.packages import attempt_import, import_rich 21 from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict 22 from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console 23 import copy 24 rich = import_rich('rich', warn=False) 25 Text = None 26 if rich is not None: 27 rich_text = attempt_import('rich.text', lazy=False) 28 Text = rich_text.Text 29 30 icons = get_config('formatting', 'pipes', CHARSET, 'icons') 31 styles = get_config('formatting', 'pipes', 'ansi', 'styles') 32 if not ANSI: 33 styles = {k: '' for k in styles} 34 print() 35 36 def ascii_print_pipes(): 37 """Print the dictionary with no unicode allowed. Also works in case rich fails to import 38 (though rich should auto-install when `attempt_import()` is called).""" 39 asciitree = attempt_import('asciitree') 40 ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}} 41 for conn_keys, metrics in pipes.items(): 42 _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector']) 43 if Text is not None: 44 replace_dict['connector'][_colored_conn_key] = ( 45 Text(conn_keys, style=styles['connector']) 46 ) 47 ascii_dict[_colored_conn_key] = {} 48 for metric, locations in metrics.items(): 49 _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric']) 50 if Text is not None: 51 replace_dict['metric'][_colored_metric_key] = ( 52 Text(metric, style=styles['metric']) 53 ) 54 ascii_dict[_colored_conn_key][_colored_metric_key] = {} 55 for location, pipe in locations.items(): 56 _location_style = styles[('none' if location is None else 'location')] 57 pipe_addendum = '\n ' + pipe.__repr__() + '\n' 58 _colored_location = colored( 59 icons['location'] + str(location), style=_location_style 60 ) 61 _colored_location_key = _colored_location + pipe_addendum 62 if Text is not None: 63 replace_dict['location'][_colored_location] = ( 64 Text(str(location), style=_location_style) 65 ) 66 ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {} 67 68 tree = asciitree.LeftAligned() 69 output = '' 70 cols = [] 71 72 ### This is pretty terrible, unreadable code. 73 ### Please know that I'm normally better than this. 74 key_str = ( 75 (Text(" ") if Text is not None else " ") + 76 ( 77 Text("Key", style='underline') if Text is not None else 78 colored("Key", style='underline') 79 ) + (Text('\n\n ') if Text is not None else '\n\n ') + 80 ( 81 Text("Connector", style=styles['connector']) if Text is not None else 82 colored("Connector", style=styles['connector']) 83 ) + (Text('\n +-- ') if Text is not None else '\n +-- ') + 84 ( 85 Text("Metric", style=styles['metric']) if Text is not None else 86 colored("Metric", style=styles['metric']) 87 ) + (Text('\n +-- ') if Text is not None else '\n +-- ') + 88 ( 89 Text("Location", style=styles['location']) if Text is not None else 90 colored("Location", style=styles['location']) 91 ) + (Text('\n\n') if Text is not None else '\n\n') 92 ) 93 94 output += str(key_str) 95 cols.append(key_str) 96 97 def replace_tree_text(tree_str : str) -> Text: 98 """Replace the colored words with stylized Text instead. 99 Is not executed if ANSI and UNICODE are disabled.""" 100 tree_text = Text(tree_str) if Text is not None else None 101 for k, v in replace_dict.items(): 102 for _colored, _text in v.items(): 103 parts = [] 104 lines = tree_text.split(_colored) 105 for part in lines: 106 parts += [part, _text] 107 if lines[-1] != Text(''): 108 parts = parts[:-1] 109 _tree_text = Text('') 110 for part in parts: 111 _tree_text += part 112 tree_text = _tree_text 113 return tree_text 114 115 tree_output = "" 116 for k, v in ascii_dict.items(): 117 branch = {k : v} 118 tree_output += tree(branch) + '\n\n' 119 if not UNICODE and not ANSI: 120 _col = (Text(tree(branch)) if Text is not None else tree(branch)) 121 else: 122 _col = replace_tree_text(tree(branch)) 123 cols.append(_col) 124 if len(output) > 0: 125 tree_output = tree_output[:-2] 126 output += tree_output 127 128 if rich is None: 129 return print(output) 130 131 rich_columns = attempt_import('rich.columns') 132 Columns = rich_columns.Columns 133 columns = Columns(cols) 134 get_console().print(columns) 135 136 if not UNICODE: 137 return ascii_print_pipes() 138 139 rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import( 140 'rich.panel', 141 'rich.tree', 142 'rich.text', 143 'rich.columns', 144 'rich.table', 145 ) 146 from rich import box 147 Panel = rich_panel.Panel 148 Tree = rich_tree.Tree 149 Text = rich_text.Text 150 Columns = rich_columns.Columns 151 Table = rich_table.Table 152 153 key_panel = Panel( 154 ( 155 Text("\n") + 156 Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") + 157 Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") + 158 Text(icons['location'] + "Location", style=styles['location']) + Text("\n") 159 ), 160 title = Text(icons['key'] + "Keys", style=styles['guide']), 161 border_style = styles['guide'], 162 expand = True 163 ) 164 165 cols = [] 166 conn_trees = {} 167 metric_trees = {} 168 pipes = sorted_dict(pipes) 169 for conn_keys, metrics in pipes.items(): 170 conn_trees[conn_keys] = Tree( 171 Text( 172 icons['connector'] + conn_keys, 173 style = styles['connector'], 174 ), 175 guide_style = styles['connector'] 176 ) 177 metric_trees[conn_keys] = {} 178 for metric, locations in metrics.items(): 179 metric_trees[conn_keys][metric] = Tree( 180 Text( 181 icons['metric'] + metric, 182 style = styles['metric'] 183 ), 184 guide_style = styles['metric'] 185 ) 186 conn_trees[conn_keys].add(metric_trees[conn_keys][metric]) 187 for location, pipe in locations.items(): 188 _location = ( 189 Text(str(location), style=styles['none']) if location is None 190 else Text(location, style=styles['location']) 191 ) 192 _location = ( 193 Text(icons['location']) 194 + _location + Text('\n') 195 + pipe_repr(pipe, as_rich_text=True) + Text('\n') 196 ) 197 metric_trees[conn_keys][metric].add(_location) 198 199 cols += [key_panel] 200 for k, t in conn_trees.items(): 201 cols.append(t) 202 203 columns = Columns(cols) 204 get_console().print(columns)
Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.
340def print_options( 341 options: Optional[Iterable[Any]] = None, 342 nopretty: bool = False, 343 no_rich: bool = False, 344 name: str = 'options', 345 header: Optional[str] = None, 346 num_cols: Optional[int] = None, 347 adjust_cols: bool = True, 348 sort_options: bool = False, 349 number_options: bool = False, 350 **kw 351) -> None: 352 """ 353 Print items in an iterable as a fancy table. 354 355 Parameters 356 ---------- 357 options: Optional[Dict[str, Any]], default None 358 The iterable to be printed. 359 360 nopretty: bool, default False 361 If `True`, don't use fancy formatting. 362 363 no_rich: bool, default False 364 If `True`, don't use `rich` to format the output. 365 366 name: str, default 'options' 367 The text in the default header after `'Available'`. 368 369 header: Optional[str], default None 370 If provided, override `name` and use this as the header text. 371 372 num_cols: Optional[int], default None 373 How many columns in the table. Depends on the terminal size. If `None`, use 8. 374 375 adjust_cols: bool, default True 376 If `True`, adjust the number of columns depending on the terminal size. 377 378 sort_options: bool, default False 379 If `True`, print the options in sorted order. 380 381 number_options: bool, default False 382 If `True`, print the option's number in the list (1 index). 383 384 """ 385 import os 386 from meerschaum.utils.packages import import_rich 387 from meerschaum.utils.formatting import make_header, highlight_pipes 388 from meerschaum.actions import actions as _actions 389 from meerschaum.utils.misc import get_cols_lines, string_width, iterate_chunks 390 391 392 if options is None: 393 options = {} 394 _options = [] 395 for o in options: 396 _options.append(str(o)) 397 if sort_options: 398 _options = sorted(_options) 399 _header = f"\nAvailable {name}" if header is None else header 400 401 if num_cols is None: 402 num_cols = 8 403 404 def _print_options_no_rich(): 405 if not nopretty: 406 print() 407 print(make_header(_header)) 408 ### print actions 409 for i, option in enumerate(_options): 410 marker = '-' if not number_options else (str(i + 1) + '.') 411 if not nopretty: 412 print(f" {marker} ", end="") 413 print(option) 414 if not nopretty: 415 print() 416 417 rich = import_rich() 418 if rich is None or nopretty or no_rich: 419 _print_options_no_rich() 420 return None 421 422 ### Prevent too many options from being truncated on small terminals. 423 if adjust_cols and _options: 424 _cols, _lines = get_cols_lines() 425 while num_cols > 1: 426 cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols) 427 num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options]) 428 if num_too_big > int(len(_options) / 3): 429 num_cols -= 1 430 continue 431 break 432 433 from meerschaum.utils.formatting import pprint, get_console 434 from meerschaum.utils.packages import attempt_import 435 rich_columns = attempt_import('rich.columns') 436 rich_panel = attempt_import('rich.panel') 437 rich_table = attempt_import('rich.table') 438 Text = attempt_import('rich.text').Text 439 box = attempt_import('rich.box') 440 Panel = rich_panel.Panel 441 Columns = rich_columns.Columns 442 Table = rich_table.Table 443 444 if _header is not None: 445 table = Table( 446 title=_header, 447 box=box.SIMPLE, 448 show_header=False, 449 show_footer=False, 450 title_style='', 451 expand = True, 452 ) 453 else: 454 table = Table.grid(padding=(0, 2)) 455 for i in range(num_cols): 456 table.add_column() 457 458 if len(_options) < 12: 459 ### If fewer than 12 items, use a single column 460 for i, option in enumerate(_options): 461 item = highlight_pipes(option) 462 if number_options: 463 item = str(i + 1) + '. ' + item 464 table.add_row(Text.from_ansi(item)) 465 else: 466 ### Otherwise, use multiple columns as before 467 num_rows = (len(_options) + num_cols - 1) // num_cols 468 item_ix = 0 469 for i in range(num_rows): 470 row = [] 471 for j in range(num_cols): 472 index = i + j * num_rows 473 if index < len(_options): 474 item = highlight_pipes(_options[index]) 475 if number_options: 476 item = str(i + 1) + '. ' + item 477 row.append(Text.from_ansi(item)) 478 item_ix += 1 479 else: 480 row.append('') 481 table.add_row(*row) 482 483 get_console().print(table) 484 return None
Print items in an iterable as a fancy table.
Parameters
- options (Optional[Dict[str, Any]], default None): The iterable to be printed.
- nopretty (bool, default False):
If
True
, don't use fancy formatting. - no_rich (bool, default False):
If
True
, don't userich
to format the output. - name (str, default 'options'):
The text in the default header after
'Available'
. - header (Optional[str], default None):
If provided, override
name
and use this as the header text. - num_cols (Optional[int], default None):
How many columns in the table. Depends on the terminal size. If
None
, use 8. - adjust_cols (bool, default True):
If
True
, adjust the number of columns depending on the terminal size. - sort_options (bool, default False):
If
True
, print the options in sorted order. - number_options (bool, default False):
If
True
, print the option's number in the list (1 index).
431def print_pipes_results( 432 pipes_results: Dict[mrsm.Pipe, SuccessTuple], 433 success_header: Optional[str] = 'Successes', 434 failure_header: Optional[str] = 'Failures', 435 nopretty: bool = False, 436 **kwargs: Any 437 ) -> None: 438 """ 439 Print the pipes and their result SuccessTuples. 440 441 Parameters 442 ---------- 443 pipes_results: Dict[mrsm.Pipe, SuccessTuple] 444 A dictionary mapping pipes to their resulting SuccessTuples. 445 446 success_header: Optional[str], default 'Successes' 447 The header to print above the successful pipes. 448 449 failure_header: Optional[str], default 'Fails' 450 The header to print above the failed pipes. 451 452 kwargs: Any 453 All other keyword arguments are passed to `meerschaum.utils.misc.print_options`. 454 """ 455 from meerschaum.utils.misc import print_options 456 successes = [pipe for pipe, (success, msg) in pipes_results.items() if success] 457 fails = [pipe for pipe, (success, msg) in pipes_results.items() if success] 458 success_options = [ 459 format_pipe_success_tuple(pipe, success_tuple, nopretty=nopretty) 460 for pipe, success_tuple in pipes_results.items() 461 if success_tuple[0] 462 ] 463 failure_options = [ 464 format_pipe_success_tuple(pipe, success_tuple, nopretty=nopretty) 465 for pipe, success_tuple in pipes_results.items() 466 if not success_tuple[0] 467 ] 468 469 if success_options: 470 print_options( 471 success_options, 472 header = success_header, 473 nopretty = nopretty, 474 **kwargs 475 ) 476 if failure_options: 477 print_options( 478 failure_options, 479 header = failure_header, 480 nopretty = nopretty, 481 **kwargs 482 )
Print the pipes and their result SuccessTuples.
Parameters
- pipes_results (Dict[mrsm.Pipe, SuccessTuple]): A dictionary mapping pipes to their resulting SuccessTuples.
- success_header (Optional[str], default 'Successes'): The header to print above the successful pipes.
- failure_header (Optional[str], default 'Fails'): The header to print above the failed pipes.
- kwargs (Any):
All other keyword arguments are passed to
meerschaum.utils.misc.print_options
.
228def print_tuple( 229 tup: mrsm.SuccessTuple, 230 skip_common: bool = True, 231 common_only: bool = False, 232 upper_padding: int = 0, 233 lower_padding: int = 0, 234 left_padding: int = 1, 235 calm: bool = False, 236 _progress: Optional['rich.progress.Progress'] = None, 237) -> None: 238 """ 239 Format `meerschaum.utils.typing.SuccessTuple`. 240 241 Parameters 242 ---------- 243 skip_common: bool, default True 244 If `True`, do not print common success tuples (i.e. `(True, "Success")`). 245 246 common_only: bool, default False 247 If `True`, only print if the success tuple is common. 248 249 upper_padding: int, default 0 250 How many newlines to prepend to the message. 251 252 lower_padding: int, default 0 253 How many newlines to append to the message. 254 255 left_padding: int, default 1 256 How mant spaces to preprend to the message. 257 258 calm: bool, default False 259 If `True`, use the default emoji and color scheme. 260 261 """ 262 from meerschaum.config.static import STATIC_CONFIG 263 do_print = True 264 265 omit_messages = STATIC_CONFIG['system']['success']['ignore'] 266 267 if common_only: 268 skip_common = False 269 do_print = tup[1].strip() in omit_messages 270 271 if skip_common: 272 do_print = tup[1].strip() not in omit_messages 273 274 if not do_print: 275 return 276 277 print(format_success_tuple( 278 tup, 279 upper_padding=upper_padding, 280 lower_padding=lower_padding, 281 calm=calm, 282 _progress=_progress, 283 ))
Format meerschaum.utils.typing.SuccessTuple
.
Parameters
- skip_common (bool, default True):
If
True
, do not print common success tuples (i.e.(True, "Success")
). - common_only (bool, default False):
If
True
, only print if the success tuple is common. - upper_padding (int, default 0): How many newlines to prepend to the message.
- lower_padding (int, default 0): How many newlines to append to the message.
- left_padding (int, default 1): How mant spaces to preprend to the message.
- calm (bool, default False):
If
True
, use the default emoji and color scheme.
60def translate_rich_to_termcolor(*colors) -> tuple: 61 """Translate between rich and more_termcolor terminology. 62 This is probably prone to breaking. 63 64 Parameters 65 ---------- 66 *colors : 67 68 69 Returns 70 ------- 71 72 """ 73 _colors = [] 74 for c in colors: 75 _c_list = [] 76 ### handle 'bright' 77 c = c.replace('bright_', 'bright ') 78 79 ### handle 'on' 80 if ' on ' in c: 81 _on = c.split(' on ') 82 _colors.append(_on[0]) 83 for _c in _on[1:]: 84 _c_list.append('on ' + _c) 85 else: 86 _c_list += [c] 87 88 _colors += _c_list 89 90 return tuple(_colors)
Translate between rich and more_termcolor terminology. This is probably prone to breaking.
Parameters
- *colors :
- Returns
- -------