meerschaum.utils.formatting

Utilities for formatting output text

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Utilities for formatting output text
  7"""
  8
  9from __future__ import annotations
 10import platform
 11import os
 12import sys
 13import meerschaum as mrsm
 14from meerschaum.utils.typing import Optional, Union, Any, Dict, Iterable
 15from meerschaum.utils.formatting._shell import make_header
 16from meerschaum.utils.formatting._pprint import pprint
 17from meerschaum.utils.formatting._pipes import (
 18    pprint_pipes,
 19    highlight_pipes,
 20    format_pipe_success_tuple,
 21    print_pipes_results,
 22    extract_stats_from_message,
 23    pipe_repr,
 24)
 25from meerschaum.utils.threading import Lock, RLock
 26
 27_attrs = {
 28    'ANSI': None,
 29    'UNICODE': None,
 30    'CHARSET': None,
 31    'RESET': '\033[0m',
 32}
 33__all__ = sorted([
 34    'ANSI', 'CHARSET', 'UNICODE', 'RESET',
 35    'colored',
 36    'translate_rich_to_termcolor',
 37    'get_console',
 38    'format_success_tuple',
 39    'print_tuple',
 40    'print_options',
 41    'fill_ansi',
 42    'pprint',
 43    'highlight_pipes',
 44    'pprint_pipes',
 45    'make_header',
 46    'pipe_repr',
 47    'print_pipes_results',
 48    'extract_stats_from_message',
 49])
 50__pdoc__ = {}
 51_locks = {
 52    '_colorama_init': RLock(),
 53}
 54
 55
 56def colored_fallback(*args, **kw):
 57    return ' '.join(args)
 58
 59def translate_rich_to_termcolor(*colors) -> tuple:
 60    """Translate between rich and more_termcolor terminology."""
 61    _colors = []
 62    for c in colors:
 63        _c_list = []
 64        ### handle 'bright'
 65        c = c.replace('bright_', 'bright ')
 66
 67        ### handle 'on'
 68        if ' on ' in c:
 69            _on = c.split(' on ')
 70            _colors.append(_on[0])
 71            for _c in _on[1:]:
 72                _c_list.append('on ' + _c)
 73        else:
 74            _c_list += [c]
 75
 76        _colors += _c_list
 77
 78    return tuple(_colors)
 79
 80
 81def rich_text_to_str(text: 'rich.text.Text') -> str:
 82    """Convert a `rich.text.Text` object to a string with ANSI in-tact."""
 83    _console = get_console()
 84    if _console is None:
 85        return str(text)
 86    with console.capture() as cap:
 87        console.print(text)
 88    string = cap.get()
 89    return string[:-1]
 90
 91
 92def _init():
 93    """
 94    Initial color settings (mostly for Windows).
 95    """
 96    if platform.system() != "Windows":
 97        return
 98    if 'PYTHONIOENCODING' not in os.environ:
 99        os.environ['PYTHONIOENCODING'] = 'utf-8'
100    if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ:
101        os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8'
102    sys.stdin.reconfigure(encoding='utf-8')
103    sys.stdout.reconfigure(encoding='utf-8')
104    sys.stderr.reconfigure(encoding='utf-8')
105
106    from ctypes import windll
107    k = windll.kernel32
108    k.SetConsoleMode(k.GetStdHandle(-11), 7)
109    os.system("color")
110
111    from meerschaum.utils.packages import attempt_import
112    ### init colorama for Windows color output
113    colorama, more_termcolor = attempt_import(
114        'colorama',
115        'more_termcolor',
116        lazy = False,
117        warn = False,
118        color = False,
119    )
120    try:
121        colorama.init(autoreset=False)
122        success = True
123    except Exception:
124        import traceback
125        traceback.print_exc()
126        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
127        success = False
128
129    if more_termcolor is None:
130        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
131        success = False
132
133    return success
134
135_colorama_init = False
136def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
137    """Apply colors and rich styles to a string.
138    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
139    Otherwise attempt to use the legacy `more_termcolor.colored` method.
140
141    Parameters
142    ----------
143    text: str
144        The string to apply formatting to.
145        
146    *colors:
147        A list of colors to pass to `more_termcolor.colored()`.
148        Has no effect if `style` is provided.
149
150    style: str, default None
151        If provided, pass to `rich` for processing.
152
153    as_rich_text: bool, default False
154        If `True`, return a `rich.Text` object.
155        `style` must be provided.
156        
157    **kw:
158        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
159        
160
161    Returns
162    -------
163    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
164
165    """
166    from meerschaum.utils.packages import import_rich, attempt_import
167    global _colorama_init
168    _init()
169    with _locks['_colorama_init']:
170        if not _colorama_init:
171            _colorama_init = _init()
172
173    if 'style' in kw:
174        rich = import_rich()
175        rich_text = attempt_import('rich.text')
176        text_obj = rich_text.Text(text, **kw)
177        if as_rich_text:
178            return text_obj
179        return rich_text_to_str(text_obj)
180
181    more_termcolor = attempt_import('more_termcolor', lazy=False)
182    try:
183        colored_text = more_termcolor.colored(text, *colors, **kw)
184    except Exception as e:
185        colored_text = None
186
187    if colored_text is not None:
188        return colored_text
189
190    try:
191        _colors = translate_rich_to_termcolor(*colors)
192        colored_text = more_termcolor.colored(text, *_colors, **kw)
193    except Exception as e:
194        colored_text = None
195
196    if colored_text is None:
197        ### NOTE: warn here?
198        return text
199
200    return colored_text
201
202console = None
203def get_console():
204    """Get the rich console."""
205    global console
206    from meerschaum.utils.packages import import_rich, attempt_import
207    rich = import_rich()
208    rich_console = attempt_import('rich.console')
209    try:
210        console = rich_console.Console(force_terminal=True, color_system='truecolor')
211    except Exception:
212        console = None
213    return console
214
215
216def print_tuple(
217    tup: mrsm.SuccessTuple,
218    skip_common: bool = True,
219    common_only: bool = False,
220    upper_padding: int = 1,
221    lower_padding: int = 1,
222    left_padding: int = 1,
223    calm: bool = False,
224    _progress: Optional['rich.progress.Progress'] = None,
225) -> None:
226    """
227    Format `meerschaum.utils.typing.SuccessTuple`.
228
229    Parameters
230    ----------
231    skip_common: bool, default True
232        If `True`, do not print common success tuples (i.e. `(True, "Success")`).
233
234    common_only: bool, default False
235        If `True`, only print if the success tuple is common.
236
237    upper_padding: int, default 0
238        How many newlines to prepend to the message.
239
240    lower_padding: int, default 0
241        How many newlines to append to the message.
242
243    left_padding: int, default 1
244        How mant spaces to preprend to the message.
245
246    calm: bool, default False
247        If `True`, use the default emoji and color scheme.
248
249    """
250    from meerschaum.config.static import STATIC_CONFIG
251    do_print = True
252
253    omit_messages = STATIC_CONFIG['system']['success']['ignore']
254
255    if common_only:
256        skip_common = False
257        do_print = tup[1].strip() in omit_messages
258
259    if skip_common:
260        do_print = tup[1].strip() not in omit_messages
261
262    if not do_print:
263        return
264
265    print(format_success_tuple(
266        tup,
267        upper_padding=upper_padding,
268        lower_padding=lower_padding,
269        calm=calm,
270        _progress=_progress,
271    ))
272
273
274def format_success_tuple(
275    tup: mrsm.SuccessTuple,
276    upper_padding: int = 0,
277    lower_padding: int = 0,
278    left_padding: int = 1,
279    calm: bool = False,
280    _progress: Optional['rich.progress.Progress'] = None,
281) -> str:
282    """
283    Format `meerschaum.utils.typing.SuccessTuple`.
284
285    Parameters
286    ----------
287    upper_padding: int, default 0
288        How many newlines to prepend to the message.
289
290    lower_padding: int, default 0
291        How many newlines to append to the message.
292
293    left_padding: int, default 1
294        How mant spaces to preprend to the message.
295
296    calm: bool, default False
297        If `True`, use the default emoji and color scheme.
298    """
299    _init()
300    try:
301        status = 'success' if tup[0] else 'failure'
302    except TypeError:
303        status = 'failure'
304        tup = None, None
305
306    if calm:
307        status += '_calm'
308
309    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
310    from meerschaum.config import get_config
311    status_config = get_config('formatting', status, patch=True)
312
313    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1]))
314    lines = msg.split('\n')
315    lines = [lines[0]] + [
316        (('    ' + line if not line.startswith(' ') else line))
317        for line in lines[1:]
318    ]
319    if ANSI:
320        lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich'])
321
322    msg = '\n'.join(lines)
323    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
324    return msg
325
326
327def print_options(
328    options: Optional[Iterable[Any]] = None,
329    nopretty: bool = False,
330    no_rich: bool = False,
331    name: str = 'options',
332    header: Optional[str] = None,
333    num_cols: Optional[int] = None,
334    adjust_cols: bool = True,
335    sort_options: bool = False,
336    number_options: bool = False,
337    **kw
338) -> None:
339    """
340    Print items in an iterable as a fancy table.
341
342    Parameters
343    ----------
344    options: Optional[Dict[str, Any]], default None
345        The iterable to be printed.
346
347    nopretty: bool, default False
348        If `True`, don't use fancy formatting.
349
350    no_rich: bool, default False
351        If `True`, don't use `rich` to format the output.
352
353    name: str, default 'options'
354        The text in the default header after `'Available'`.
355
356    header: Optional[str], default None
357        If provided, override `name` and use this as the header text.
358
359    num_cols: Optional[int], default None
360        How many columns in the table. Depends on the terminal size. If `None`, use 8.
361
362    adjust_cols: bool, default True
363        If `True`, adjust the number of columns depending on the terminal size.
364
365    sort_options: bool, default False
366        If `True`, print the options in sorted order.
367
368    number_options: bool, default False
369        If `True`, print the option's number in the list (1 index).
370
371    """
372    from meerschaum.utils.packages import import_rich
373    from meerschaum.utils.formatting import highlight_pipes
374    from meerschaum.utils.misc import get_cols_lines, string_width
375
376    if options is None:
377        options = {}
378    _options = []
379    for o in options:
380        _options.append(str(o))
381    if sort_options:
382        _options = sorted(_options)
383    _header = f"\nAvailable {name}" if header is None else header
384
385    if num_cols is None:
386        num_cols = 8
387
388    def _print_options_no_rich():
389        if not nopretty:
390            print()
391            print(make_header(_header))
392        ### print actions
393        for i, option in enumerate(_options):
394            marker = '-' if not number_options else (str(i + 1) + '.')
395            if not nopretty:
396                print(f"  {marker} ", end="")
397            print(option)
398        if not nopretty:
399            print()
400
401    rich = import_rich()
402    if rich is None or nopretty or no_rich:
403        _print_options_no_rich()
404        return None
405
406    ### Prevent too many options from being truncated on small terminals.
407    if adjust_cols and _options:
408        _cols, _lines = get_cols_lines()
409        while num_cols > 1:
410            cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols)
411            num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options])
412            if num_too_big > int(len(_options) / 3):
413                num_cols -= 1
414                continue
415            break
416
417    from meerschaum.utils.packages import attempt_import
418    rich_table = attempt_import('rich.table')
419    Text = attempt_import('rich.text').Text
420    box = attempt_import('rich.box')
421    Table = rich_table.Table
422
423    if _header is not None:
424        table = Table(
425            title=_header,
426            box=box.SIMPLE,
427            show_header=False,
428            show_footer=False,
429            title_style='',
430            expand = True,
431        )
432    else:
433        table = Table.grid(padding=(0, 2))
434    for i in range(num_cols):
435        table.add_column()
436
437    if len(_options) < 12:
438        ### If fewer than 12 items, use a single column
439        for i, option in enumerate(_options):
440            item = highlight_pipes(option)
441            if number_options:
442                item = str(i + 1) + '. ' + item
443            table.add_row(Text.from_ansi(item))
444    else:
445        ### Otherwise, use multiple columns as before
446        num_rows = (len(_options) + num_cols - 1) // num_cols
447        item_ix = 0
448        for i in range(num_rows):
449            row = []
450            for j in range(num_cols):
451                index = i + j * num_rows
452                if index < len(_options):
453                    item = highlight_pipes(_options[index])
454                    if number_options:
455                        item = str(i + 1) + '. ' + item
456                    row.append(Text.from_ansi(item))
457                    item_ix += 1
458                else:
459                    row.append('')
460            table.add_row(*row)
461
462    get_console().print(table)
463    return None
464
465
466def fill_ansi(string: str, style: str = '') -> str:
467    """
468    Fill in non-formatted segments of ANSI text.
469
470    Parameters
471    ----------
472    string: str
473        A string which contains ANSI escape codes.
474
475    style: str
476        Style arguments to pass to `rich.text.Text`.
477
478    Returns
479    -------
480    A string with ANSI styling applied to the segments which don't yet have a style applied.
481    """
482    from meerschaum.utils.packages import import_rich, attempt_import
483    from meerschaum.utils.misc import iterate_chunks
484    _ = import_rich()
485    rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text')
486    Text = rich_text.Text
487    try:
488        msg = Text.from_ansi(string)
489    except AttributeError:
490        import traceback
491        traceback.print_stack()
492        msg = ''
493    plain_indices = []
494    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
495        left = left_span.end
496        right = right_span.start if not isinstance(right_span, int) else right_span
497        if left != right:
498            plain_indices.append((left, right))
499    if msg.spans:
500        if msg.spans[0].start != 0:
501            plain_indices = [(0, msg.spans[0].start)] + plain_indices
502        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
503            plain_indices.append((msg.spans[-1].end, len(msg)))
504
505    if plain_indices:
506        for left, right in plain_indices:
507            msg.stylize(style, left, right)
508    else:
509        msg = Text(str(msg), style)
510
511    return rich_text_to_str(msg)
512
513
514def __getattr__(name: str) -> str:
515    """
516    Lazily load module-level variables.
517    """
518    if name.startswith('__') and name.endswith('__'):
519        raise AttributeError("Cannot import dunders from this module.")
520
521    if name in _attrs:
522        if _attrs[name] is not None:
523            return _attrs[name]
524        from meerschaum.config import get_config
525        if name.lower() in get_config('formatting'):
526            _attrs[name] = get_config('formatting', name.lower())
527        elif name == 'CHARSET':
528            _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii'
529        return _attrs[name]
530    
531    if name == '__wrapped__':
532        import sys
533        return sys.modules[__name__]
534    if name == '__all__':
535        return __all__
536
537    try:
538        return globals()[name]
539    except KeyError:
540        raise AttributeError(f"Could not find '{name}'")
ANSI
CHARSET
RESET
UNICODE
def colored( text: str, *colors, as_rich_text: bool = False, **kw) -> Union[str, rich.text.Text]:
137def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
138    """Apply colors and rich styles to a string.
139    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
140    Otherwise attempt to use the legacy `more_termcolor.colored` method.
141
142    Parameters
143    ----------
144    text: str
145        The string to apply formatting to.
146        
147    *colors:
148        A list of colors to pass to `more_termcolor.colored()`.
149        Has no effect if `style` is provided.
150
151    style: str, default None
152        If provided, pass to `rich` for processing.
153
154    as_rich_text: bool, default False
155        If `True`, return a `rich.Text` object.
156        `style` must be provided.
157        
158    **kw:
159        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
160        
161
162    Returns
163    -------
164    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
165
166    """
167    from meerschaum.utils.packages import import_rich, attempt_import
168    global _colorama_init
169    _init()
170    with _locks['_colorama_init']:
171        if not _colorama_init:
172            _colorama_init = _init()
173
174    if 'style' in kw:
175        rich = import_rich()
176        rich_text = attempt_import('rich.text')
177        text_obj = rich_text.Text(text, **kw)
178        if as_rich_text:
179            return text_obj
180        return rich_text_to_str(text_obj)
181
182    more_termcolor = attempt_import('more_termcolor', lazy=False)
183    try:
184        colored_text = more_termcolor.colored(text, *colors, **kw)
185    except Exception as e:
186        colored_text = None
187
188    if colored_text is not None:
189        return colored_text
190
191    try:
192        _colors = translate_rich_to_termcolor(*colors)
193        colored_text = more_termcolor.colored(text, *_colors, **kw)
194    except Exception as e:
195        colored_text = None
196
197    if colored_text is None:
198        ### NOTE: warn here?
199        return text
200
201    return colored_text

Apply colors and rich styles to a string. If a style keyword is provided, a rich.text.Text object will be parsed into a string. Otherwise attempt to use the legacy more_termcolor.colored method.

Parameters
  • text (str): The string to apply formatting to.
  • *colors:: A list of colors to pass to more_termcolor.colored(). Has no effect if style is provided.
  • style (str, default None): If provided, pass to rich for processing.
  • as_rich_text (bool, default False): If True, return a rich.Text object. style must be provided.
  • **kw:: Keyword arguments to pass to rich.text.Text or more_termcolor.
Returns
  • An ANSI-formatted string or a rich.text.Text object if as_rich_text is True.
def extract_stats_from_message(message: str, stat_keys: Optional[List[str]] = None) -> Dict[str, int]:
485def extract_stats_from_message(
486    message: str,
487    stat_keys: Optional[List[str]] = None,
488) -> Dict[str, int]:
489    """
490    Given a sync message, return the insert, update, upsert stats from within.
491
492    Parameters
493    ----------
494    message: str
495        The message to parse for statistics.
496
497    stat_keys: Optional[List[str]], default None
498        If provided, search for these words (case insensitive) in the message.
499        Defaults to `['inserted', 'updated', 'upserted']`.
500
501    Returns
502    -------
503    A dictionary mapping the stat keys to the total number of rows affected.
504    """
505    stat_keys = stat_keys or ['inserted', 'updated', 'upserted', 'checked']
506    lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')]
507    message_stats = {
508        stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats)
509        for stat_key in stat_keys
510    }
511    return message_stats

Given a sync message, return the insert, update, upsert stats from within.

Parameters
  • message (str): The message to parse for statistics.
  • stat_keys (Optional[List[str]], default None): If provided, search for these words (case insensitive) in the message. Defaults to ['inserted', 'updated', 'upserted'].
Returns
  • A dictionary mapping the stat keys to the total number of rows affected.
def fill_ansi(string: str, style: str = '') -> str:
467def fill_ansi(string: str, style: str = '') -> str:
468    """
469    Fill in non-formatted segments of ANSI text.
470
471    Parameters
472    ----------
473    string: str
474        A string which contains ANSI escape codes.
475
476    style: str
477        Style arguments to pass to `rich.text.Text`.
478
479    Returns
480    -------
481    A string with ANSI styling applied to the segments which don't yet have a style applied.
482    """
483    from meerschaum.utils.packages import import_rich, attempt_import
484    from meerschaum.utils.misc import iterate_chunks
485    _ = import_rich()
486    rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text')
487    Text = rich_text.Text
488    try:
489        msg = Text.from_ansi(string)
490    except AttributeError:
491        import traceback
492        traceback.print_stack()
493        msg = ''
494    plain_indices = []
495    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
496        left = left_span.end
497        right = right_span.start if not isinstance(right_span, int) else right_span
498        if left != right:
499            plain_indices.append((left, right))
500    if msg.spans:
501        if msg.spans[0].start != 0:
502            plain_indices = [(0, msg.spans[0].start)] + plain_indices
503        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
504            plain_indices.append((msg.spans[-1].end, len(msg)))
505
506    if plain_indices:
507        for left, right in plain_indices:
508            msg.stylize(style, left, right)
509    else:
510        msg = Text(str(msg), style)
511
512    return rich_text_to_str(msg)

Fill in non-formatted segments of ANSI text.

Parameters
  • string (str): A string which contains ANSI escape codes.
  • style (str): Style arguments to pass to rich.text.Text.
Returns
  • A string with ANSI styling applied to the segments which don't yet have a style applied.
def format_success_tuple( tup: Tuple[bool, str], upper_padding: int = 0, lower_padding: int = 0, left_padding: int = 1, calm: bool = False, _progress: Optional[rich.progress.Progress] = None) -> str:
275def format_success_tuple(
276    tup: mrsm.SuccessTuple,
277    upper_padding: int = 0,
278    lower_padding: int = 0,
279    left_padding: int = 1,
280    calm: bool = False,
281    _progress: Optional['rich.progress.Progress'] = None,
282) -> str:
283    """
284    Format `meerschaum.utils.typing.SuccessTuple`.
285
286    Parameters
287    ----------
288    upper_padding: int, default 0
289        How many newlines to prepend to the message.
290
291    lower_padding: int, default 0
292        How many newlines to append to the message.
293
294    left_padding: int, default 1
295        How mant spaces to preprend to the message.
296
297    calm: bool, default False
298        If `True`, use the default emoji and color scheme.
299    """
300    _init()
301    try:
302        status = 'success' if tup[0] else 'failure'
303    except TypeError:
304        status = 'failure'
305        tup = None, None
306
307    if calm:
308        status += '_calm'
309
310    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
311    from meerschaum.config import get_config
312    status_config = get_config('formatting', status, patch=True)
313
314    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1]))
315    lines = msg.split('\n')
316    lines = [lines[0]] + [
317        (('    ' + line if not line.startswith(' ') else line))
318        for line in lines[1:]
319    ]
320    if ANSI:
321        lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich'])
322
323    msg = '\n'.join(lines)
324    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
325    return msg

Format meerschaum.utils.typing.SuccessTuple.

Parameters
  • upper_padding (int, default 0): How many newlines to prepend to the message.
  • lower_padding (int, default 0): How many newlines to append to the message.
  • left_padding (int, default 1): How mant spaces to preprend to the message.
  • calm (bool, default False): If True, use the default emoji and color scheme.
def get_console():
204def get_console():
205    """Get the rich console."""
206    global console
207    from meerschaum.utils.packages import import_rich, attempt_import
208    rich = import_rich()
209    rich_console = attempt_import('rich.console')
210    try:
211        console = rich_console.Console(force_terminal=True, color_system='truecolor')
212    except Exception:
213        console = None
214    return console

Get the rich console.

def highlight_pipes(message: str) -> str:
330def highlight_pipes(message: str) -> str:
331    """
332    Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects.
333    """
334    if 'Pipe(' not in message:
335        return message
336
337    from meerschaum import Pipe
338    segments = message.split('Pipe(')
339    msg = ''
340    _d = {}
341    for i, segment in enumerate(segments):
342        comma_index = segment.find(',')
343        paren_index = segment.find(')')
344        single_quote_index = segment.find("'")
345        double_quote_index = segment.find('"')
346
347        has_comma = comma_index != -1
348        has_paren = paren_index != -1
349        has_single_quote = single_quote_index != -1
350        has_double_quote = double_quote_index != -1
351        has_quote = has_single_quote or has_double_quote
352        quote_index = (
353            min(single_quote_index, double_quote_index)
354            if has_double_quote and has_single_quote
355            else (single_quote_index if has_single_quote else double_quote_index)
356        )
357
358        has_pipe = (
359            has_comma
360            and
361            has_paren
362            and
363            has_quote
364            and not
365            (comma_index > paren_index or quote_index > paren_index)
366        )
367
368        if has_pipe:
369            code = "_d['pipe'] = Pipe(" + segment[:paren_index + 1]
370            try:
371                exec(code)
372                _to_add = pipe_repr(_d['pipe']) + segment[paren_index + 1:]
373                _ = _d.pop('pipe', None)
374            except Exception as e:
375                _to_add = 'Pipe(' + segment
376            msg += _to_add
377            continue
378        msg += segment
379    return msg

Add syntax highlighting to an info message containing stringified meerschaum.Pipe objects.

def make_header(message: str, ruler: str = '─') -> str:
15def make_header(
16    message: str,
17    ruler: str = '─',
18) -> str:
19    """Format a message string with a ruler.
20    Length of the ruler is the length of the longest word.
21    
22    Example:
23        'My\nheader' -> 'My\nheader\n──────'
24    """
25
26    from meerschaum.utils.formatting import ANSI, UNICODE, colored
27    if not UNICODE:
28        ruler = '-'
29    words = message.split('\n')
30    max_length = 0
31    for w in words:
32        length = len(w)
33        if length > max_length:
34            max_length = length
35
36    s = message + "\n"
37    for i in range(max_length):
38        s += ruler
39    return s

Format a message string with a ruler. Length of the ruler is the length of the longest word.

Example:
    'My

header' -> 'My header ──────'

def pipe_repr( pipe: meerschaum.Pipe, as_rich_text: bool = False, ansi: Optional[bool] = None) -> Union[str, rich.text.Text]:
279def pipe_repr(
280    pipe: mrsm.Pipe,
281    as_rich_text: bool = False,
282    ansi: Optional[bool] = None,
283) -> Union[str, 'rich.text.Text']:
284    """
285    Return a formatted string for representing a `meerschaum.Pipe`.
286    """
287    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, colored, rich_text_to_str
288    from meerschaum.utils.packages import import_rich, attempt_import
289    rich = import_rich()
290    Text = attempt_import('rich.text').Text
291
292    styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles')
293    if not ANSI or (ansi is False):
294        styles = {k: '' for k in styles}
295    _pipe_style_prefix, _pipe_style_suffix = (
296        (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe']
297        else ('', '')
298    )
299    text_obj = (
300        Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix)
301        + colored(("'" + pipe.connector_keys + "'"), style=styles['connector'], as_rich_text=True)
302        + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix)
303        + colored(("'" + pipe.metric_key + "'"), style=styles['metric'], as_rich_text=True)
304        + (
305            (
306                colored(', ', style=styles['punctuation'], as_rich_text=True)
307                + colored(
308                    ("'" + pipe.location_key + "'"),
309                    style=styles['location'], as_rich_text=True
310                )
311            ) if pipe.location_key is not None
312            else colored('', style='', as_rich_text=True)
313        ) + (
314            ( ### Add the `instance=` argument.
315                colored(', instance=', style=styles['punctuation'], as_rich_text=True)
316                + colored(
317                    ("'" + pipe.instance_keys + "'"),
318                    style=styles['instance'], as_rich_text=True
319                )
320            ) if pipe.instance_keys != get_config('meerschaum', 'instance')
321            else colored('', style='', as_rich_text=True)
322        )
323        + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix)
324    )
325    if as_rich_text:
326        return text_obj
327    return rich_text_to_str(text_obj).replace('\n', '')

Return a formatted string for representing a meerschaum.Pipe.

def pprint( *args, detect_password: bool = True, nopretty: bool = False, **kw) -> None:
 10def pprint(
 11    *args,
 12    detect_password: bool = True,
 13    nopretty: bool = False,
 14    **kw
 15) -> None:
 16    """Pretty print an object according to the configured ANSI and UNICODE settings.
 17    If detect_password is True (default), search and replace passwords with '*' characters.
 18    Does not mutate objects.
 19    """
 20    import copy
 21    import json
 22    from meerschaum.utils.packages import attempt_import, import_rich
 23    from meerschaum.utils.formatting import ANSI, get_console, print_tuple
 24    from meerschaum.utils.warnings import error
 25    from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords
 26    from collections import OrderedDict
 27
 28    if (
 29        len(args) == 1
 30        and
 31        isinstance(args[0], tuple)
 32        and
 33        len(args[0]) == 2
 34        and
 35        isinstance(args[0][0], bool)
 36        and
 37        isinstance(args[0][1], str)
 38    ):
 39        return print_tuple(args[0], **filter_keywords(print_tuple, **kw))
 40
 41    modify = True
 42    rich_pprint = None
 43    if ANSI and not nopretty:
 44        rich = import_rich()
 45        if rich is not None:
 46            rich_pretty = attempt_import('rich.pretty')
 47        if rich_pretty is not None:
 48            def _rich_pprint(*args, **kw):
 49                _console = get_console()
 50                _kw = filter_keywords(_console.print, **kw)
 51                _console.print(*args, **_kw)
 52            rich_pprint = _rich_pprint
 53    elif not nopretty:
 54        pprintpp = attempt_import('pprintpp', warn=False)
 55        try:
 56            _pprint = pprintpp.pprint
 57        except Exception :
 58            import pprint as _pprint_module
 59            _pprint = _pprint_module.pprint
 60
 61    func = (
 62        _pprint if rich_pprint is None else rich_pprint
 63    ) if not nopretty else print
 64
 65    try:
 66        args_copy = copy.deepcopy(args)
 67    except Exception:
 68        args_copy = args
 69        modify = False
 70    _args = []
 71    for a in args:
 72        c = a
 73        ### convert OrderedDict into dict
 74        if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict):
 75            c = dict_from_od(copy.deepcopy(c))
 76        _args.append(c)
 77    args = _args
 78
 79    _args = list(args)
 80    if detect_password and modify:
 81        _args = []
 82        for a in args:
 83            c = a
 84            if isinstance(c, dict):
 85                c = replace_password(copy.deepcopy(c))
 86            if nopretty:
 87                try:
 88                    c = json.dumps(c)
 89                    is_json = True
 90                except Exception:
 91                    is_json = False
 92                if not is_json:
 93                    try:
 94                        c = str(c)
 95                    except Exception:
 96                        pass
 97            _args.append(c)
 98
 99    ### filter out unsupported keywords
100    func_kw = filter_keywords(func, **kw) if not nopretty else {}
101    error_msg = None
102    try:
103        func(*_args, **func_kw)
104    except Exception as e:
105        error_msg = e
106    if error_msg is not None:
107        error(error_msg)

Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.

def pprint_pipes( pipes: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]) -> None:
 16def pprint_pipes(pipes: PipesDict) -> None:
 17    """Print a stylized tree of a Pipes dictionary.
 18    Supports ANSI and UNICODE global settings."""
 19    from meerschaum.utils.warnings import error
 20    from meerschaum.utils.packages import attempt_import, import_rich
 21    from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict
 22    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console
 23    import copy
 24    rich = import_rich('rich', warn=False)
 25    Text = None
 26    if rich is not None:
 27        rich_text = attempt_import('rich.text', lazy=False)
 28        Text = rich_text.Text
 29
 30    icons = get_config('formatting', 'pipes', CHARSET, 'icons')
 31    styles = get_config('formatting', 'pipes', 'ansi', 'styles')
 32    if not ANSI:
 33        styles = {k: '' for k in styles}
 34    print()
 35
 36    def ascii_print_pipes():
 37        """Print the dictionary with no unicode allowed. Also works in case rich fails to import
 38        (though rich should auto-install when `attempt_import()` is called)."""
 39        asciitree = attempt_import('asciitree')
 40        ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}}
 41        for conn_keys, metrics in pipes.items():
 42            _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector'])
 43            if Text is not None:
 44                replace_dict['connector'][_colored_conn_key] = (
 45                    Text(conn_keys, style=styles['connector'])
 46                )
 47            ascii_dict[_colored_conn_key] = {}
 48            for metric, locations in metrics.items():
 49                _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric'])
 50                if Text is not None:
 51                    replace_dict['metric'][_colored_metric_key] = (
 52                        Text(metric, style=styles['metric'])
 53                    )
 54                ascii_dict[_colored_conn_key][_colored_metric_key] = {}
 55                for location, pipe in locations.items():
 56                    _location_style = styles[('none' if location is None else 'location')]
 57                    pipe_addendum = '\n         ' + pipe.__repr__() + '\n'
 58                    _colored_location = colored(
 59                        icons['location'] + str(location), style=_location_style
 60                    )
 61                    _colored_location_key = _colored_location + pipe_addendum
 62                    if Text is not None:
 63                        replace_dict['location'][_colored_location] = (
 64                            Text(str(location), style=_location_style)
 65                        )
 66                    ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {}
 67
 68        tree = asciitree.LeftAligned()
 69        output = ''
 70        cols = []
 71
 72        ### This is pretty terrible, unreadable code.
 73        ### Please know that I'm normally better than this.
 74        key_str = (
 75            (Text("     ") if Text is not None else "     ") +
 76            (
 77                Text("Key", style='underline') if Text is not None else
 78                colored("Key", style='underline')
 79            ) + (Text('\n\n  ') if Text is not None else '\n\n  ') +
 80            (
 81                Text("Connector", style=styles['connector']) if Text is not None else
 82                colored("Connector", style=styles['connector'])
 83            ) + (Text('\n   +-- ') if Text is not None else '\n   +-- ') +
 84            (
 85                Text("Metric", style=styles['metric']) if Text is not None else
 86                colored("Metric", style=styles['metric'])
 87            ) + (Text('\n       +-- ') if Text is not None else '\n       +-- ') +
 88            (
 89                Text("Location", style=styles['location']) if Text is not None else
 90                colored("Location", style=styles['location'])
 91            ) + (Text('\n\n') if Text is not None else '\n\n')
 92        )
 93
 94        output += str(key_str)
 95        cols.append(key_str)
 96
 97        def replace_tree_text(tree_str : str) -> Text:
 98            """Replace the colored words with stylized Text instead.
 99            Is not executed if ANSI and UNICODE are disabled."""
100            tree_text = Text(tree_str) if Text is not None else None
101            for k, v in replace_dict.items():
102                for _colored, _text in v.items():
103                    parts = []
104                    lines = tree_text.split(_colored)
105                    for part in lines:
106                        parts += [part, _text]
107                    if lines[-1] != Text(''):
108                        parts = parts[:-1]
109                    _tree_text = Text('')
110                    for part in parts:
111                        _tree_text += part
112                    tree_text = _tree_text
113            return tree_text
114
115        tree_output = ""
116        for k, v in ascii_dict.items():
117            branch = {k : v}
118            tree_output += tree(branch) + '\n\n'
119            if not UNICODE and not ANSI:
120                _col = (Text(tree(branch)) if Text is not None else tree(branch))
121            else:
122                _col = replace_tree_text(tree(branch))
123            cols.append(_col)
124        if len(output) > 0:
125            tree_output = tree_output[:-2]
126        output += tree_output
127
128        if rich is None:
129            return print(output)
130
131        rich_columns = attempt_import('rich.columns')
132        Columns = rich_columns.Columns
133        columns = Columns(cols)
134        get_console().print(columns)
135
136    if not UNICODE:
137        return ascii_print_pipes()
138
139    rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import(
140        'rich.panel',
141        'rich.tree',
142        'rich.text',
143        'rich.columns',
144        'rich.table',
145    )
146    from rich import box
147    Panel = rich_panel.Panel
148    Tree = rich_tree.Tree
149    Text = rich_text.Text
150    Columns = rich_columns.Columns
151    Table = rich_table.Table
152
153    key_panel = Panel(
154        (
155            Text("\n") +
156            Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") +
157            Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") +
158            Text(icons['location'] + "Location", style=styles['location']) + Text("\n")
159        ),
160        title = Text(icons['key'] + "Keys", style=styles['guide']),
161        border_style = styles['guide'],
162        expand = True
163    )
164
165    cols = []
166    conn_trees = {}
167    metric_trees = {}
168    pipes = sorted_dict(pipes)
169    for conn_keys, metrics in pipes.items():
170        conn_trees[conn_keys] = Tree(
171            Text(
172                icons['connector'] + conn_keys,
173                style = styles['connector'],
174            ),
175            guide_style = styles['connector']
176        )
177        metric_trees[conn_keys] = {}
178        for metric, locations in metrics.items():
179            metric_trees[conn_keys][metric] = Tree(
180                Text(
181                    icons['metric'] + metric,
182                    style = styles['metric']
183                ),
184                guide_style = styles['metric']
185            )
186            conn_trees[conn_keys].add(metric_trees[conn_keys][metric])
187            for location, pipe in locations.items():
188                _location = (
189                    Text(str(location), style=styles['none']) if location is None
190                    else Text(location, style=styles['location'])
191                )
192                _location = (
193                    Text(icons['location'])
194                    + _location + Text('\n')
195                    + pipe_repr(pipe, as_rich_text=True) + Text('\n')
196                )
197                metric_trees[conn_keys][metric].add(_location)
198
199    cols += [key_panel]
200    for k, t in conn_trees.items():
201        cols.append(t)
202
203    columns = Columns(cols)
204    get_console().print(columns)

Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.

def translate_rich_to_termcolor(*colors) -> tuple:
60def translate_rich_to_termcolor(*colors) -> tuple:
61    """Translate between rich and more_termcolor terminology."""
62    _colors = []
63    for c in colors:
64        _c_list = []
65        ### handle 'bright'
66        c = c.replace('bright_', 'bright ')
67
68        ### handle 'on'
69        if ' on ' in c:
70            _on = c.split(' on ')
71            _colors.append(_on[0])
72            for _c in _on[1:]:
73                _c_list.append('on ' + _c)
74        else:
75            _c_list += [c]
76
77        _colors += _c_list
78
79    return tuple(_colors)

Translate between rich and more_termcolor terminology.