meerschaum.utils.formatting

Utilities for formatting output text

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Utilities for formatting output text
  7"""
  8
  9from __future__ import annotations
 10import platform
 11import os
 12import sys
 13import meerschaum as mrsm
 14from meerschaum.utils.typing import Optional, Union, Any, Dict, Iterable
 15from meerschaum.utils.formatting._shell import make_header
 16from meerschaum.utils.formatting._pprint import pprint
 17from meerschaum.utils.formatting._pipes import (
 18    pprint_pipes,
 19    highlight_pipes,
 20    format_pipe_success_tuple,
 21    print_pipes_results,
 22    extract_stats_from_message,
 23    pipe_repr,
 24)
 25from meerschaum.utils.threading import Lock, RLock
 26
 27_attrs = {
 28    'ANSI': None,
 29    'UNICODE': None,
 30    'CHARSET': None,
 31    'RESET': '\033[0m',
 32}
 33__all__ = sorted([
 34    'ANSI', 'CHARSET', 'UNICODE', 'RESET',
 35    'colored',
 36    'translate_rich_to_termcolor',
 37    'get_console',
 38    'format_success_tuple',
 39    'print_tuple',
 40    'print_options',
 41    'fill_ansi',
 42    'pprint',
 43    'highlight_pipes',
 44    'pprint_pipes',
 45    'make_header',
 46    'pipe_repr',
 47    'print_pipes_results',
 48    'extract_stats_from_message',
 49])
 50__pdoc__ = {}
 51_locks = {
 52    '_colorama_init': RLock(),
 53}
 54
 55
 56def colored_fallback(*args, **kw):
 57    return ' '.join(args)
 58
 59def translate_rich_to_termcolor(*colors) -> tuple:
 60    """Translate between rich and more_termcolor terminology.
 61    This is probably prone to breaking.
 62
 63    Parameters
 64    ----------
 65    *colors :
 66        
 67
 68    Returns
 69    -------
 70
 71    """
 72    _colors = []
 73    for c in colors:
 74        _c_list = []
 75        ### handle 'bright'
 76        c = c.replace('bright_', 'bright ')
 77
 78        ### handle 'on'
 79        if ' on ' in c:
 80            _on = c.split(' on ')
 81            _colors.append(_on[0])
 82            for _c in _on[1:]:
 83                _c_list.append('on ' + _c)
 84        else:
 85            _c_list += [c]
 86
 87        _colors += _c_list
 88
 89    return tuple(_colors)
 90
 91
 92def rich_text_to_str(text: 'rich.text.Text') -> str:
 93    """Convert a `rich.text.Text` object to a string with ANSI in-tact."""
 94    _console = get_console()
 95    if _console is None:
 96        return str(text)
 97    with console.capture() as cap:
 98        console.print(text)
 99    string = cap.get()
100    return string[:-1]
101
102
103def _init():
104    """
105    Initial color settings (mostly for Windows).
106    """
107    if platform.system() != "Windows":
108        return
109    if 'PYTHONIOENCODING' not in os.environ:
110        os.environ['PYTHONIOENCODING'] = 'utf-8'
111    if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ:
112        os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8'
113    sys.stdin.reconfigure(encoding='utf-8')
114    sys.stdout.reconfigure(encoding='utf-8')
115    sys.stderr.reconfigure(encoding='utf-8')
116
117    from ctypes import windll
118    k = windll.kernel32
119    k.SetConsoleMode(k.GetStdHandle(-11), 7)
120    os.system("color")
121
122    from meerschaum.utils.packages import attempt_import
123    ### init colorama for Windows color output
124    colorama, more_termcolor = attempt_import(
125        'colorama',
126        'more_termcolor',
127        lazy = False,
128        warn = False,
129        color = False,
130    )
131    try:
132        colorama.init(autoreset=False)
133        success = True
134    except Exception as e:
135        import traceback
136        traceback.print_exc()
137        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
138        success = False
139
140    if more_termcolor is None:
141        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
142        success = False
143
144    return success
145
146_colorama_init = False
147def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
148    """Apply colors and rich styles to a string.
149    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
150    Otherwise attempt to use the legacy `more_termcolor.colored` method.
151
152    Parameters
153    ----------
154    text: str
155        The string to apply formatting to.
156        
157    *colors:
158        A list of colors to pass to `more_termcolor.colored()`.
159        Has no effect if `style` is provided.
160
161    style: str, default None
162        If provided, pass to `rich` for processing.
163
164    as_rich_text: bool, default False
165        If `True`, return a `rich.Text` object.
166        `style` must be provided.
167        
168    **kw:
169        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
170        
171
172    Returns
173    -------
174    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
175
176    """
177    from meerschaum.utils.packages import import_rich, attempt_import
178    global _colorama_init
179    _init()
180    with _locks['_colorama_init']:
181        if not _colorama_init:
182            _colorama_init = _init()
183
184    if 'style' in kw:
185        rich = import_rich()
186        rich_text = attempt_import('rich.text')
187        text_obj = rich_text.Text(text, **kw)
188        if as_rich_text:
189            return text_obj
190        return rich_text_to_str(text_obj)
191
192    more_termcolor = attempt_import('more_termcolor', lazy=False)
193    try:
194        colored_text = more_termcolor.colored(text, *colors, **kw)
195    except Exception as e:
196        colored_text = None
197
198    if colored_text is not None:
199        return colored_text
200
201    try:
202        _colors = translate_rich_to_termcolor(*colors)
203        colored_text = more_termcolor.colored(text, *_colors, **kw)
204    except Exception as e:
205        colored_text = None
206
207    if colored_text is None:
208        ### NOTE: warn here?
209        return text
210
211    return colored_text
212
213console = None
214def get_console():
215    """Get the rich console."""
216    global console
217    from meerschaum.utils.packages import import_rich, attempt_import
218    rich = import_rich()
219    rich_console = attempt_import('rich.console')
220    try:
221        console = rich_console.Console(force_terminal=True, color_system='truecolor')
222    except Exception as e:
223        console = None
224    return console
225
226
227def print_tuple(
228    tup: mrsm.SuccessTuple,
229    skip_common: bool = True,
230    common_only: bool = False,
231    upper_padding: int = 0,
232    lower_padding: int = 0,
233    left_padding: int = 1,
234    calm: bool = False,
235    _progress: Optional['rich.progress.Progress'] = None,
236) -> None:
237    """
238    Format `meerschaum.utils.typing.SuccessTuple`.
239
240    Parameters
241    ----------
242    skip_common: bool, default True
243        If `True`, do not print common success tuples (i.e. `(True, "Success")`).
244
245    common_only: bool, default False
246        If `True`, only print if the success tuple is common.
247
248    upper_padding: int, default 0
249        How many newlines to prepend to the message.
250
251    lower_padding: int, default 0
252        How many newlines to append to the message.
253
254    left_padding: int, default 1
255        How mant spaces to preprend to the message.
256
257    calm: bool, default False
258        If `True`, use the default emoji and color scheme.
259
260    """
261    from meerschaum.config.static import STATIC_CONFIG
262    do_print = True
263
264    omit_messages = STATIC_CONFIG['system']['success']['ignore']
265
266    if common_only:
267        skip_common = False
268        do_print = tup[1].strip() in omit_messages
269
270    if skip_common:
271        do_print = tup[1].strip() not in omit_messages
272
273    if not do_print:
274        return
275
276    print(format_success_tuple(
277        tup,
278        upper_padding=upper_padding,
279        lower_padding=lower_padding,
280        calm=calm,
281        _progress=_progress,
282    ))
283
284
285def format_success_tuple(
286    tup: mrsm.SuccessTuple,
287    upper_padding: int = 0,
288    lower_padding: int = 0,
289    left_padding: int = 1,
290    calm: bool = False,
291    _progress: Optional['rich.progress.Progress'] = None,
292) -> str:
293    """
294    Format `meerschaum.utils.typing.SuccessTuple`.
295
296    Parameters
297    ----------
298    upper_padding: int, default 0
299        How many newlines to prepend to the message.
300
301    lower_padding: int, default 0
302        How many newlines to append to the message.
303
304    left_padding: int, default 1
305        How mant spaces to preprend to the message.
306
307    calm: bool, default False
308        If `True`, use the default emoji and color scheme.
309    """
310    from meerschaum.config.static import STATIC_CONFIG
311    _init()
312    try:
313        status = 'success' if tup[0] else 'failure'
314    except TypeError:
315        status = 'failure'
316        tup = None, None
317
318    if calm:
319        status += '_calm'
320
321    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
322    from meerschaum.config import get_config
323    status_config = get_config('formatting', status, patch=True)
324
325    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1]))
326    lines = msg.split('\n')
327    lines = [lines[0]] + [
328        (('    ' + line if not line.startswith(' ') else line))
329        for line in lines[1:]
330    ]
331    if ANSI:
332        lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich'])
333
334    msg = '\n'.join(lines)
335    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
336    return msg
337
338
339def print_options(
340    options: Optional[Iterable[Any]] = None,
341    nopretty: bool = False,
342    no_rich: bool = False,
343    name: str = 'options',
344    header: Optional[str] = None,
345    num_cols: Optional[int] = None,
346    adjust_cols: bool = True,
347    sort_options: bool = False,
348    number_options: bool = False,
349    **kw
350) -> None:
351    """
352    Print items in an iterable as a fancy table.
353
354    Parameters
355    ----------
356    options: Optional[Dict[str, Any]], default None
357        The iterable to be printed.
358
359    nopretty: bool, default False
360        If `True`, don't use fancy formatting.
361
362    no_rich: bool, default False
363        If `True`, don't use `rich` to format the output.
364
365    name: str, default 'options'
366        The text in the default header after `'Available'`.
367
368    header: Optional[str], default None
369        If provided, override `name` and use this as the header text.
370
371    num_cols: Optional[int], default None
372        How many columns in the table. Depends on the terminal size. If `None`, use 8.
373
374    adjust_cols: bool, default True
375        If `True`, adjust the number of columns depending on the terminal size.
376
377    sort_options: bool, default False
378        If `True`, print the options in sorted order.
379
380    number_options: bool, default False
381        If `True`, print the option's number in the list (1 index).
382
383    """
384    import os
385    from meerschaum.utils.packages import import_rich
386    from meerschaum.utils.formatting import make_header, highlight_pipes
387    from meerschaum.actions import actions as _actions
388    from meerschaum.utils.misc import get_cols_lines, string_width, iterate_chunks
389
390
391    if options is None:
392        options = {}
393    _options = []
394    for o in options:
395        _options.append(str(o))
396    if sort_options:
397        _options = sorted(_options)
398    _header = f"\nAvailable {name}" if header is None else header
399
400    if num_cols is None:
401        num_cols = 8
402
403    def _print_options_no_rich():
404        if not nopretty:
405            print()
406            print(make_header(_header))
407        ### print actions
408        for i, option in enumerate(_options):
409            marker = '-' if not number_options else (str(i + 1) + '.')
410            if not nopretty:
411                print(f"  {marker} ", end="")
412            print(option)
413        if not nopretty:
414            print()
415
416    rich = import_rich()
417    if rich is None or nopretty or no_rich:
418        _print_options_no_rich()
419        return None
420
421    ### Prevent too many options from being truncated on small terminals.
422    if adjust_cols and _options:
423        _cols, _lines = get_cols_lines()
424        while num_cols > 1:
425            cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols)
426            num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options])
427            if num_too_big > int(len(_options) / 3):
428                num_cols -= 1
429                continue
430            break
431
432    from meerschaum.utils.formatting import pprint, get_console
433    from meerschaum.utils.packages import attempt_import
434    rich_columns = attempt_import('rich.columns')
435    rich_panel = attempt_import('rich.panel')
436    rich_table = attempt_import('rich.table')
437    Text = attempt_import('rich.text').Text
438    box = attempt_import('rich.box')
439    Panel = rich_panel.Panel
440    Columns = rich_columns.Columns
441    Table = rich_table.Table
442
443    if _header is not None:
444        table = Table(
445            title=_header,
446            box=box.SIMPLE,
447            show_header=False,
448            show_footer=False,
449            title_style='',
450            expand = True,
451        )
452    else:
453        table = Table.grid(padding=(0, 2))
454    for i in range(num_cols):
455        table.add_column()
456
457    if len(_options) < 12:
458        ### If fewer than 12 items, use a single column
459        for i, option in enumerate(_options):
460            item = highlight_pipes(option)
461            if number_options:
462                item = str(i + 1) + '. ' + item
463            table.add_row(Text.from_ansi(item))
464    else:
465        ### Otherwise, use multiple columns as before
466        num_rows = (len(_options) + num_cols - 1) // num_cols
467        item_ix = 0
468        for i in range(num_rows):
469            row = []
470            for j in range(num_cols):
471                index = i + j * num_rows
472                if index < len(_options):
473                    item = highlight_pipes(_options[index])
474                    if number_options:
475                        item = str(i + 1) + '. ' + item
476                    row.append(Text.from_ansi(item))
477                    item_ix += 1
478                else:
479                    row.append('')
480            table.add_row(*row)
481
482    get_console().print(table)
483    return None
484
485
486def fill_ansi(string: str, style: str = '') -> str:
487    """
488    Fill in non-formatted segments of ANSI text.
489
490    Parameters
491    ----------
492    string: str
493        A string which contains ANSI escape codes.
494
495    style: str
496        Style arguments to pass to `rich.text.Text`.
497
498    Returns
499    -------
500    A string with ANSI styling applied to the segments which don't yet have a style applied.
501    """
502    from meerschaum.utils.packages import import_rich, attempt_import
503    from meerschaum.utils.misc import iterate_chunks
504    rich = import_rich()
505    Text = attempt_import('rich.text').Text
506    try:
507        msg = Text.from_ansi(string)
508    except AttributeError as e:
509        import traceback
510        traceback.print_stack()
511        msg = ''
512    plain_indices = []
513    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
514        left = left_span.end
515        right = right_span.start if not isinstance(right_span, int) else right_span
516        if left != right:
517            plain_indices.append((left, right))
518    if msg.spans:
519        if msg.spans[0].start != 0:
520            plain_indices = [(0, msg.spans[0].start)] + plain_indices
521        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
522            plain_indices.append((msg.spans[-1].end, len(msg)))
523
524    if plain_indices:
525        for left, right in plain_indices:
526            msg.stylize(style, left, right)
527    else:
528        msg = Text(str(msg), style)
529
530    return rich_text_to_str(msg)
531
532
533def __getattr__(name: str) -> str:
534    """
535    Lazily load module-level variables.
536    """
537    if name.startswith('__') and name.endswith('__'):
538        raise AttributeError("Cannot import dunders from this module.")
539
540    if name in _attrs:
541        if _attrs[name] is not None:
542            return _attrs[name]
543        from meerschaum.config import get_config
544        if name.lower() in get_config('formatting'):
545            _attrs[name] = get_config('formatting', name.lower())
546        elif name == 'CHARSET':
547            _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii'
548        return _attrs[name]
549    
550    if name == '__wrapped__':
551        import sys
552        return sys.modules[__name__]
553    if name == '__all__':
554        return __all__
555
556    try:
557        return globals()[name]
558    except KeyError:
559        raise AttributeError(f"Could not find '{name}'")
ANSI
CHARSET
RESET
UNICODE
def colored( text: str, *colors, as_rich_text: bool = False, **kw) -> Union[str, rich.text.Text]:
148def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
149    """Apply colors and rich styles to a string.
150    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
151    Otherwise attempt to use the legacy `more_termcolor.colored` method.
152
153    Parameters
154    ----------
155    text: str
156        The string to apply formatting to.
157        
158    *colors:
159        A list of colors to pass to `more_termcolor.colored()`.
160        Has no effect if `style` is provided.
161
162    style: str, default None
163        If provided, pass to `rich` for processing.
164
165    as_rich_text: bool, default False
166        If `True`, return a `rich.Text` object.
167        `style` must be provided.
168        
169    **kw:
170        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
171        
172
173    Returns
174    -------
175    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
176
177    """
178    from meerschaum.utils.packages import import_rich, attempt_import
179    global _colorama_init
180    _init()
181    with _locks['_colorama_init']:
182        if not _colorama_init:
183            _colorama_init = _init()
184
185    if 'style' in kw:
186        rich = import_rich()
187        rich_text = attempt_import('rich.text')
188        text_obj = rich_text.Text(text, **kw)
189        if as_rich_text:
190            return text_obj
191        return rich_text_to_str(text_obj)
192
193    more_termcolor = attempt_import('more_termcolor', lazy=False)
194    try:
195        colored_text = more_termcolor.colored(text, *colors, **kw)
196    except Exception as e:
197        colored_text = None
198
199    if colored_text is not None:
200        return colored_text
201
202    try:
203        _colors = translate_rich_to_termcolor(*colors)
204        colored_text = more_termcolor.colored(text, *_colors, **kw)
205    except Exception as e:
206        colored_text = None
207
208    if colored_text is None:
209        ### NOTE: warn here?
210        return text
211
212    return colored_text

Apply colors and rich styles to a string. If a style keyword is provided, a rich.text.Text object will be parsed into a string. Otherwise attempt to use the legacy more_termcolor.colored method.

Parameters
  • text (str): The string to apply formatting to.
  • *colors:: A list of colors to pass to more_termcolor.colored(). Has no effect if style is provided.
  • style (str, default None): If provided, pass to rich for processing.
  • as_rich_text (bool, default False): If True, return a rich.Text object. style must be provided.
  • **kw:: Keyword arguments to pass to rich.text.Text or more_termcolor.
Returns
  • An ANSI-formatted string or a rich.text.Text object if as_rich_text is True.
def extract_stats_from_message(message: str, stat_keys: Optional[List[str]] = None) -> Dict[str, int]:
485def extract_stats_from_message(
486        message: str,
487        stat_keys: Optional[List[str]] = None,
488    ) -> Dict[str, int]:
489    """
490    Given a sync message, return the insert, update, upsert stats from within.
491
492    Parameters
493    ----------
494    message: str
495        The message to parse for statistics.
496
497    stat_keys: Optional[List[str]], default None
498        If provided, search for these words (case insensitive) in the message.
499        Defaults to `['inserted', 'updated', 'upserted']`.
500
501    Returns
502    -------
503    A dictionary mapping the stat keys to the total number of rows affected.
504    """
505    stat_keys = stat_keys or ['inserted', 'updated', 'upserted']
506    lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')]
507    message_stats = {
508        stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats)
509        for stat_key in stat_keys
510    }
511    return message_stats

Given a sync message, return the insert, update, upsert stats from within.

Parameters
  • message (str): The message to parse for statistics.
  • stat_keys (Optional[List[str]], default None): If provided, search for these words (case insensitive) in the message. Defaults to ['inserted', 'updated', 'upserted'].
Returns
  • A dictionary mapping the stat keys to the total number of rows affected.
def fill_ansi(string: str, style: str = '') -> str:
487def fill_ansi(string: str, style: str = '') -> str:
488    """
489    Fill in non-formatted segments of ANSI text.
490
491    Parameters
492    ----------
493    string: str
494        A string which contains ANSI escape codes.
495
496    style: str
497        Style arguments to pass to `rich.text.Text`.
498
499    Returns
500    -------
501    A string with ANSI styling applied to the segments which don't yet have a style applied.
502    """
503    from meerschaum.utils.packages import import_rich, attempt_import
504    from meerschaum.utils.misc import iterate_chunks
505    rich = import_rich()
506    Text = attempt_import('rich.text').Text
507    try:
508        msg = Text.from_ansi(string)
509    except AttributeError as e:
510        import traceback
511        traceback.print_stack()
512        msg = ''
513    plain_indices = []
514    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
515        left = left_span.end
516        right = right_span.start if not isinstance(right_span, int) else right_span
517        if left != right:
518            plain_indices.append((left, right))
519    if msg.spans:
520        if msg.spans[0].start != 0:
521            plain_indices = [(0, msg.spans[0].start)] + plain_indices
522        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
523            plain_indices.append((msg.spans[-1].end, len(msg)))
524
525    if plain_indices:
526        for left, right in plain_indices:
527            msg.stylize(style, left, right)
528    else:
529        msg = Text(str(msg), style)
530
531    return rich_text_to_str(msg)

Fill in non-formatted segments of ANSI text.

Parameters
  • string (str): A string which contains ANSI escape codes.
  • style (str): Style arguments to pass to rich.text.Text.
Returns
  • A string with ANSI styling applied to the segments which don't yet have a style applied.
def format_success_tuple( tup: Tuple[bool, str], upper_padding: int = 0, lower_padding: int = 0, left_padding: int = 1, calm: bool = False, _progress: Optional[rich.progress.Progress] = None) -> str:
286def format_success_tuple(
287    tup: mrsm.SuccessTuple,
288    upper_padding: int = 0,
289    lower_padding: int = 0,
290    left_padding: int = 1,
291    calm: bool = False,
292    _progress: Optional['rich.progress.Progress'] = None,
293) -> str:
294    """
295    Format `meerschaum.utils.typing.SuccessTuple`.
296
297    Parameters
298    ----------
299    upper_padding: int, default 0
300        How many newlines to prepend to the message.
301
302    lower_padding: int, default 0
303        How many newlines to append to the message.
304
305    left_padding: int, default 1
306        How mant spaces to preprend to the message.
307
308    calm: bool, default False
309        If `True`, use the default emoji and color scheme.
310    """
311    from meerschaum.config.static import STATIC_CONFIG
312    _init()
313    try:
314        status = 'success' if tup[0] else 'failure'
315    except TypeError:
316        status = 'failure'
317        tup = None, None
318
319    if calm:
320        status += '_calm'
321
322    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
323    from meerschaum.config import get_config
324    status_config = get_config('formatting', status, patch=True)
325
326    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1]))
327    lines = msg.split('\n')
328    lines = [lines[0]] + [
329        (('    ' + line if not line.startswith(' ') else line))
330        for line in lines[1:]
331    ]
332    if ANSI:
333        lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich'])
334
335    msg = '\n'.join(lines)
336    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
337    return msg

Format meerschaum.utils.typing.SuccessTuple.

Parameters
  • upper_padding (int, default 0): How many newlines to prepend to the message.
  • lower_padding (int, default 0): How many newlines to append to the message.
  • left_padding (int, default 1): How mant spaces to preprend to the message.
  • calm (bool, default False): If True, use the default emoji and color scheme.
def get_console():
215def get_console():
216    """Get the rich console."""
217    global console
218    from meerschaum.utils.packages import import_rich, attempt_import
219    rich = import_rich()
220    rich_console = attempt_import('rich.console')
221    try:
222        console = rich_console.Console(force_terminal=True, color_system='truecolor')
223    except Exception as e:
224        console = None
225    return console

Get the rich console.

def highlight_pipes(message: str) -> str:
330def highlight_pipes(message: str) -> str:
331    """
332    Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects.
333    """
334    if 'Pipe(' not in message:
335        return message
336
337    from meerschaum import Pipe
338    segments = message.split('Pipe(')
339    msg = ''
340    _d = {}
341    for i, segment in enumerate(segments):
342        comma_index = segment.find(',')
343        paren_index = segment.find(')')
344        single_quote_index = segment.find("'")
345        double_quote_index = segment.find('"')
346
347        has_comma = comma_index != -1
348        has_paren = paren_index != -1
349        has_single_quote = single_quote_index != -1
350        has_double_quote = double_quote_index != -1
351        has_quote = has_single_quote or has_double_quote
352        quote_index = (
353            min(single_quote_index, double_quote_index)
354            if has_double_quote and has_single_quote
355            else (single_quote_index if has_single_quote else double_quote_index)
356        )
357
358        has_pipe = (
359            has_comma
360            and
361            has_paren
362            and
363            has_quote
364            and not
365            (comma_index > paren_index or quote_index > paren_index)
366        )
367
368        if has_pipe:
369            code = "_d['pipe'] = Pipe(" + segment[:paren_index + 1]
370            try:
371                exec(code)
372                _to_add = pipe_repr(_d['pipe']) + segment[paren_index + 1:]
373                _ = _d.pop('pipe', None)
374            except Exception as e:
375                _to_add = 'Pipe(' + segment
376            msg += _to_add
377            continue
378        msg += segment
379    return msg

Add syntax highlighting to an info message containing stringified meerschaum.Pipe objects.

def make_header(message: str, ruler: str = '─') -> str:
15def make_header(
16    message: str,
17    ruler: str = '─',
18) -> str:
19    """Format a message string with a ruler.
20    Length of the ruler is the length of the longest word.
21    
22    Example:
23        'My\nheader' -> 'My\nheader\n──────'
24    """
25
26    from meerschaum.utils.formatting import ANSI, UNICODE, colored
27    if not UNICODE:
28        ruler = '-'
29    words = message.split('\n')
30    max_length = 0
31    for w in words:
32        length = len(w)
33        if length > max_length:
34            max_length = length
35
36    s = message + "\n"
37    for i in range(max_length):
38        s += ruler
39    return s

Format a message string with a ruler. Length of the ruler is the length of the longest word.

Example:
    'My

header' -> 'My header ──────'

def pipe_repr( pipe: meerschaum.Pipe, as_rich_text: bool = False, ansi: Optional[bool] = None) -> Union[str, rich.text.Text]:
279def pipe_repr(
280    pipe: mrsm.Pipe,
281    as_rich_text: bool = False,
282    ansi: Optional[bool] = None,
283) -> Union[str, 'rich.text.Text']:
284    """
285    Return a formatted string for representing a `meerschaum.Pipe`.
286    """
287    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, colored, rich_text_to_str
288    from meerschaum.utils.packages import import_rich, attempt_import
289    rich = import_rich()
290    Text = attempt_import('rich.text').Text
291
292    styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles')
293    if not ANSI or (ansi is False):
294        styles = {k: '' for k in styles}
295    _pipe_style_prefix, _pipe_style_suffix = (
296        (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe']
297        else ('', '')
298    )
299    text_obj = (
300        Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix)
301        + colored(("'" + pipe.connector_keys + "'"), style=styles['connector'], as_rich_text=True)
302        + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix)
303        + colored(("'" + pipe.metric_key + "'"), style=styles['metric'], as_rich_text=True)
304        + (
305            (
306                colored(', ', style=styles['punctuation'], as_rich_text=True)
307                + colored(
308                    ("'" + pipe.location_key + "'"),
309                    style=styles['location'], as_rich_text=True
310                )
311            ) if pipe.location_key is not None
312            else colored('', style='', as_rich_text=True)
313        ) + (
314            ( ### Add the `instance=` argument.
315                colored(', instance=', style=styles['punctuation'], as_rich_text=True)
316                + colored(
317                    ("'" + pipe.instance_keys + "'"),
318                    style=styles['instance'], as_rich_text=True
319                )
320            ) if pipe.instance_keys != get_config('meerschaum', 'instance')
321            else colored('', style='', as_rich_text=True)
322        )
323        + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix)
324    )
325    if as_rich_text:
326        return text_obj
327    return rich_text_to_str(text_obj).replace('\n', '')

Return a formatted string for representing a meerschaum.Pipe.

def pprint( *args, detect_password: bool = True, nopretty: bool = False, **kw) -> None:
 10def pprint(
 11        *args,
 12        detect_password: bool = True,
 13        nopretty: bool = False,
 14        **kw
 15    ) -> None:
 16    """Pretty print an object according to the configured ANSI and UNICODE settings.
 17    If detect_password is True (default), search and replace passwords with '*' characters.
 18    Does not mutate objects.
 19    """
 20    from meerschaum.utils.packages import attempt_import, import_rich
 21    from meerschaum.utils.formatting import ANSI, UNICODE, get_console, print_tuple
 22    from meerschaum.utils.warnings import error
 23    from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords
 24    from collections import OrderedDict
 25    import copy, json
 26
 27    if (
 28        len(args) == 1
 29        and
 30        isinstance(args[0], tuple)
 31        and
 32        len(args[0]) == 2
 33        and
 34        isinstance(args[0][0], bool)
 35        and
 36        isinstance(args[0][1], str)
 37    ):
 38        return print_tuple(args[0])
 39
 40    modify = True
 41    rich_pprint = None
 42    if ANSI and not nopretty:
 43        rich = import_rich()
 44        if rich is not None:
 45            rich_pretty = attempt_import('rich.pretty')
 46        if rich_pretty is not None:
 47            def _rich_pprint(*args, **kw):
 48                _console = get_console()
 49                _kw = filter_keywords(_console.print, **kw)
 50                _console.print(*args, **_kw)
 51            rich_pprint = _rich_pprint
 52    elif not nopretty:
 53        pprintpp = attempt_import('pprintpp', warn=False)
 54        try:
 55            _pprint = pprintpp.pprint
 56        except Exception as e:
 57            import pprint as _pprint_module
 58            _pprint = _pprint_module.pprint
 59
 60    func = (
 61        _pprint if rich_pprint is None else rich_pprint
 62    ) if not nopretty else print
 63
 64    try:
 65        args_copy = copy.deepcopy(args)
 66    except Exception as e:
 67        args_copy = args
 68        modify = False
 69    _args = []
 70    for a in args:
 71        c = a
 72        ### convert OrderedDict into dict
 73        if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict):
 74            c = dict_from_od(copy.deepcopy(c))
 75        _args.append(c)
 76    args = _args
 77
 78    _args = list(args)
 79    if detect_password and modify:
 80        _args = []
 81        for a in args:
 82            c = a
 83            if isinstance(c, dict):
 84                c = replace_password(copy.deepcopy(c))
 85            if nopretty:
 86                try:
 87                    c = json.dumps(c)
 88                    is_json = True
 89                except Exception as e:
 90                    is_json = False
 91                if not is_json:
 92                    try:
 93                        c = str(c)
 94                    except Exception as e:
 95                        pass
 96            _args.append(c)
 97
 98    ### filter out unsupported keywords
 99    func_kw = filter_keywords(func, **kw) if not nopretty else {}
100    error_msg = None
101    try:
102        func(*_args, **func_kw)
103    except Exception as e:
104        error_msg = e
105    if error_msg is not None:
106        error(error_msg)

Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.

def pprint_pipes( pipes: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]) -> None:
 16def pprint_pipes(pipes: PipesDict) -> None:
 17    """Print a stylized tree of a Pipes dictionary.
 18    Supports ANSI and UNICODE global settings."""
 19    from meerschaum.utils.warnings import error
 20    from meerschaum.utils.packages import attempt_import, import_rich
 21    from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict
 22    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console
 23    import copy
 24    rich = import_rich('rich', warn=False)
 25    Text = None
 26    if rich is not None:
 27        rich_text = attempt_import('rich.text', lazy=False)
 28        Text = rich_text.Text
 29
 30    icons = get_config('formatting', 'pipes', CHARSET, 'icons')
 31    styles = get_config('formatting', 'pipes', 'ansi', 'styles')
 32    if not ANSI:
 33        styles = {k: '' for k in styles}
 34    print()
 35
 36    def ascii_print_pipes():
 37        """Print the dictionary with no unicode allowed. Also works in case rich fails to import
 38        (though rich should auto-install when `attempt_import()` is called)."""
 39        asciitree = attempt_import('asciitree')
 40        ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}}
 41        for conn_keys, metrics in pipes.items():
 42            _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector'])
 43            if Text is not None:
 44                replace_dict['connector'][_colored_conn_key] = (
 45                    Text(conn_keys, style=styles['connector'])
 46                )
 47            ascii_dict[_colored_conn_key] = {}
 48            for metric, locations in metrics.items():
 49                _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric'])
 50                if Text is not None:
 51                    replace_dict['metric'][_colored_metric_key] = (
 52                        Text(metric, style=styles['metric'])
 53                    )
 54                ascii_dict[_colored_conn_key][_colored_metric_key] = {}
 55                for location, pipe in locations.items():
 56                    _location_style = styles[('none' if location is None else 'location')]
 57                    pipe_addendum = '\n         ' + pipe.__repr__() + '\n'
 58                    _colored_location = colored(
 59                        icons['location'] + str(location), style=_location_style
 60                    )
 61                    _colored_location_key = _colored_location + pipe_addendum
 62                    if Text is not None:
 63                        replace_dict['location'][_colored_location] = (
 64                            Text(str(location), style=_location_style)
 65                        )
 66                    ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {}
 67
 68        tree = asciitree.LeftAligned()
 69        output = ''
 70        cols = []
 71
 72        ### This is pretty terrible, unreadable code.
 73        ### Please know that I'm normally better than this.
 74        key_str = (
 75            (Text("     ") if Text is not None else "     ") +
 76            (
 77                Text("Key", style='underline') if Text is not None else
 78                colored("Key", style='underline')
 79            ) + (Text('\n\n  ') if Text is not None else '\n\n  ') +
 80            (
 81                Text("Connector", style=styles['connector']) if Text is not None else
 82                colored("Connector", style=styles['connector'])
 83            ) + (Text('\n   +-- ') if Text is not None else '\n   +-- ') +
 84            (
 85                Text("Metric", style=styles['metric']) if Text is not None else
 86                colored("Metric", style=styles['metric'])
 87            ) + (Text('\n       +-- ') if Text is not None else '\n       +-- ') +
 88            (
 89                Text("Location", style=styles['location']) if Text is not None else
 90                colored("Location", style=styles['location'])
 91            ) + (Text('\n\n') if Text is not None else '\n\n')
 92        )
 93
 94        output += str(key_str)
 95        cols.append(key_str)
 96
 97        def replace_tree_text(tree_str : str) -> Text:
 98            """Replace the colored words with stylized Text instead.
 99            Is not executed if ANSI and UNICODE are disabled."""
100            tree_text = Text(tree_str) if Text is not None else None
101            for k, v in replace_dict.items():
102                for _colored, _text in v.items():
103                    parts = []
104                    lines = tree_text.split(_colored)
105                    for part in lines:
106                        parts += [part, _text]
107                    if lines[-1] != Text(''):
108                        parts = parts[:-1]
109                    _tree_text = Text('')
110                    for part in parts:
111                        _tree_text += part
112                    tree_text = _tree_text
113            return tree_text
114
115        tree_output = ""
116        for k, v in ascii_dict.items():
117            branch = {k : v}
118            tree_output += tree(branch) + '\n\n'
119            if not UNICODE and not ANSI:
120                _col = (Text(tree(branch)) if Text is not None else tree(branch))
121            else:
122                _col = replace_tree_text(tree(branch))
123            cols.append(_col)
124        if len(output) > 0:
125            tree_output = tree_output[:-2]
126        output += tree_output
127
128        if rich is None:
129            return print(output)
130
131        rich_columns = attempt_import('rich.columns')
132        Columns = rich_columns.Columns
133        columns = Columns(cols)
134        get_console().print(columns)
135
136    if not UNICODE:
137        return ascii_print_pipes()
138
139    rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import(
140        'rich.panel',
141        'rich.tree',
142        'rich.text',
143        'rich.columns',
144        'rich.table',
145    )
146    from rich import box
147    Panel = rich_panel.Panel
148    Tree = rich_tree.Tree
149    Text = rich_text.Text
150    Columns = rich_columns.Columns
151    Table = rich_table.Table
152
153    key_panel = Panel(
154        (
155            Text("\n") +
156            Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") +
157            Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") +
158            Text(icons['location'] + "Location", style=styles['location']) + Text("\n")
159        ),
160        title = Text(icons['key'] + "Keys", style=styles['guide']),
161        border_style = styles['guide'],
162        expand = True
163    )
164
165    cols = []
166    conn_trees = {}
167    metric_trees = {}
168    pipes = sorted_dict(pipes)
169    for conn_keys, metrics in pipes.items():
170        conn_trees[conn_keys] = Tree(
171            Text(
172                icons['connector'] + conn_keys,
173                style = styles['connector'],
174            ),
175            guide_style = styles['connector']
176        )
177        metric_trees[conn_keys] = {}
178        for metric, locations in metrics.items():
179            metric_trees[conn_keys][metric] = Tree(
180                Text(
181                    icons['metric'] + metric,
182                    style = styles['metric']
183                ),
184                guide_style = styles['metric']
185            )
186            conn_trees[conn_keys].add(metric_trees[conn_keys][metric])
187            for location, pipe in locations.items():
188                _location = (
189                    Text(str(location), style=styles['none']) if location is None
190                    else Text(location, style=styles['location'])
191                )
192                _location = (
193                    Text(icons['location'])
194                    + _location + Text('\n')
195                    + pipe_repr(pipe, as_rich_text=True) + Text('\n')
196                )
197                metric_trees[conn_keys][metric].add(_location)
198
199    cols += [key_panel]
200    for k, t in conn_trees.items():
201        cols.append(t)
202
203    columns = Columns(cols)
204    get_console().print(columns)

Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.

def translate_rich_to_termcolor(*colors) -> tuple:
60def translate_rich_to_termcolor(*colors) -> tuple:
61    """Translate between rich and more_termcolor terminology.
62    This is probably prone to breaking.
63
64    Parameters
65    ----------
66    *colors :
67        
68
69    Returns
70    -------
71
72    """
73    _colors = []
74    for c in colors:
75        _c_list = []
76        ### handle 'bright'
77        c = c.replace('bright_', 'bright ')
78
79        ### handle 'on'
80        if ' on ' in c:
81            _on = c.split(' on ')
82            _colors.append(_on[0])
83            for _c in _on[1:]:
84                _c_list.append('on ' + _c)
85        else:
86            _c_list += [c]
87
88        _colors += _c_list
89
90    return tuple(_colors)

Translate between rich and more_termcolor terminology. This is probably prone to breaking.

Parameters
  • *colors :
  • Returns
  • -------