meerschaum.utils.formatting

Utilities for formatting output text

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Utilities for formatting output text
  7"""
  8
  9from __future__ import annotations
 10import platform
 11import os
 12import sys
 13import meerschaum as mrsm
 14from meerschaum.utils.typing import Optional, Union, Any, Dict, Iterable
 15from meerschaum.utils.formatting._shell import make_header
 16from meerschaum.utils.formatting._pprint import pprint
 17from meerschaum.utils.formatting._pipes import (
 18    pprint_pipes,
 19    highlight_pipes,
 20    format_pipe_success_tuple,
 21    print_pipes_results,
 22    extract_stats_from_message,
 23    pipe_repr,
 24)
 25from meerschaum.utils.threading import Lock, RLock
 26
 27_attrs = {
 28    'ANSI': None,
 29    'UNICODE': None,
 30    'CHARSET': None,
 31    'RESET': '\033[0m',
 32}
 33__all__ = sorted([
 34    'ANSI', 'CHARSET', 'UNICODE', 'RESET',
 35    'colored',
 36    'translate_rich_to_termcolor',
 37    'get_console',
 38    'format_success_tuple',
 39    'print_tuple',
 40    'print_options',
 41    'fill_ansi',
 42    'pprint',
 43    'highlight_pipes',
 44    'pprint_pipes',
 45    'make_header',
 46    'pipe_repr',
 47    'print_pipes_results',
 48    'extract_stats_from_message',
 49])
 50__pdoc__ = {}
 51_locks = {
 52    '_colorama_init': RLock(),
 53}
 54
 55
 56def colored_fallback(*args, **kw):
 57    return ' '.join(args)
 58
 59def translate_rich_to_termcolor(*colors) -> tuple:
 60    """Translate between rich and more_termcolor terminology."""
 61    _colors = []
 62    for c in colors:
 63        _c_list = []
 64        ### handle 'bright'
 65        c = c.replace('bright_', 'bright ')
 66
 67        ### handle 'on'
 68        if ' on ' in c:
 69            _on = c.split(' on ')
 70            _colors.append(_on[0])
 71            for _c in _on[1:]:
 72                _c_list.append('on ' + _c)
 73        else:
 74            _c_list += [c]
 75
 76        _colors += _c_list
 77
 78    return tuple(_colors)
 79
 80
 81def rich_text_to_str(text: 'rich.text.Text') -> str:
 82    """Convert a `rich.text.Text` object to a string with ANSI in-tact."""
 83    _console = get_console()
 84    if _console is None:
 85        return str(text)
 86    with console.capture() as cap:
 87        console.print(text)
 88    string = cap.get()
 89    return string[:-1]
 90
 91
 92def _init():
 93    """
 94    Initial color settings (mostly for Windows).
 95    """
 96    if platform.system() != "Windows":
 97        return
 98    if 'PYTHONIOENCODING' not in os.environ:
 99        os.environ['PYTHONIOENCODING'] = 'utf-8'
100    if 'PYTHONLEGACYWINDOWSSTDIO' not in os.environ:
101        os.environ['PYTHONLEGACYWINDOWSSTDIO'] = 'utf-8'
102    sys.stdin.reconfigure(encoding='utf-8')
103    sys.stdout.reconfigure(encoding='utf-8')
104    sys.stderr.reconfigure(encoding='utf-8')
105
106    from ctypes import windll
107    k = windll.kernel32
108    k.SetConsoleMode(k.GetStdHandle(-11), 7)
109    os.system("color")
110
111    from meerschaum.utils.packages import attempt_import
112    ### init colorama for Windows color output
113    colorama, more_termcolor = attempt_import(
114        'colorama',
115        'more_termcolor',
116        lazy = False,
117        warn = False,
118        color = False,
119    )
120    try:
121        colorama.init(autoreset=False)
122        success = True
123    except Exception:
124        import traceback
125        traceback.print_exc()
126        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
127        success = False
128
129    if more_termcolor is None:
130        _attrs['ANSI'], _attrs['UNICODE'], _attrs['CHARSET'] = False, False, 'ascii'
131        success = False
132
133    return success
134
135_colorama_init = False
136def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
137    """Apply colors and rich styles to a string.
138    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
139    Otherwise attempt to use the legacy `more_termcolor.colored` method.
140
141    Parameters
142    ----------
143    text: str
144        The string to apply formatting to.
145        
146    *colors:
147        A list of colors to pass to `more_termcolor.colored()`.
148        Has no effect if `style` is provided.
149
150    style: str, default None
151        If provided, pass to `rich` for processing.
152
153    as_rich_text: bool, default False
154        If `True`, return a `rich.Text` object.
155        `style` must be provided.
156        
157    **kw:
158        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
159        
160
161    Returns
162    -------
163    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
164
165    """
166    from meerschaum.utils.packages import import_rich, attempt_import
167    global _colorama_init
168    _init()
169    with _locks['_colorama_init']:
170        if not _colorama_init:
171            _colorama_init = _init()
172
173    if 'style' in kw:
174        rich = import_rich()
175        rich_text = attempt_import('rich.text')
176        text_obj = rich_text.Text(text, **kw)
177        if as_rich_text:
178            return text_obj
179        return rich_text_to_str(text_obj)
180
181    more_termcolor = attempt_import('more_termcolor', lazy=False)
182    try:
183        colored_text = more_termcolor.colored(text, *colors, **kw)
184    except Exception as e:
185        colored_text = None
186
187    if colored_text is not None:
188        return colored_text
189
190    try:
191        _colors = translate_rich_to_termcolor(*colors)
192        colored_text = more_termcolor.colored(text, *_colors, **kw)
193    except Exception as e:
194        colored_text = None
195
196    if colored_text is None:
197        ### NOTE: warn here?
198        return text
199
200    return colored_text
201
202console = None
203def get_console():
204    """Get the rich console."""
205    global console
206    from meerschaum.utils.packages import import_rich, attempt_import
207    rich = import_rich()
208    rich_console = attempt_import('rich.console')
209    try:
210        console = rich_console.Console(force_terminal=True, color_system='truecolor')
211    except Exception:
212        import traceback
213        traceback.print_exc()
214        console = None
215    return console
216
217
218def print_tuple(
219    tup: mrsm.SuccessTuple,
220    skip_common: bool = True,
221    common_only: bool = False,
222    upper_padding: int = 1,
223    lower_padding: int = 1,
224    left_padding: int = 1,
225    calm: bool = False,
226    _progress: Optional['rich.progress.Progress'] = None,
227) -> None:
228    """
229    Format `meerschaum.utils.typing.SuccessTuple`.
230
231    Parameters
232    ----------
233    skip_common: bool, default True
234        If `True`, do not print common success tuples (i.e. `(True, "Success")`).
235
236    common_only: bool, default False
237        If `True`, only print if the success tuple is common.
238
239    upper_padding: int, default 0
240        How many newlines to prepend to the message.
241
242    lower_padding: int, default 0
243        How many newlines to append to the message.
244
245    left_padding: int, default 1
246        How mant spaces to preprend to the message.
247
248    calm: bool, default False
249        If `True`, use the default emoji and color scheme.
250
251    """
252    from meerschaum._internal.static import STATIC_CONFIG
253    do_print = True
254
255    omit_messages = STATIC_CONFIG['system']['success']['ignore']
256
257    if common_only:
258        skip_common = False
259        do_print = tup[1].strip() in omit_messages
260
261    if skip_common:
262        do_print = tup[1].strip() not in omit_messages
263
264    if not do_print:
265        return
266
267    print(format_success_tuple(
268        tup,
269        upper_padding=upper_padding,
270        lower_padding=lower_padding,
271        calm=calm,
272        _progress=_progress,
273    ))
274
275
276def format_success_tuple(
277    tup: mrsm.SuccessTuple,
278    upper_padding: int = 0,
279    lower_padding: int = 0,
280    left_padding: int = 1,
281    calm: bool = False,
282    _progress: Optional['rich.progress.Progress'] = None,
283) -> str:
284    """
285    Format `meerschaum.utils.typing.SuccessTuple`.
286
287    Parameters
288    ----------
289    upper_padding: int, default 0
290        How many newlines to prepend to the message.
291
292    lower_padding: int, default 0
293        How many newlines to append to the message.
294
295    left_padding: int, default 1
296        How mant spaces to preprend to the message.
297
298    calm: bool, default False
299        If `True`, use the default emoji and color scheme.
300    """
301    _init()
302    try:
303        status = 'success' if tup[0] else 'failure'
304    except TypeError:
305        status = 'failure'
306        tup = None, None
307
308    if calm:
309        status += '_calm'
310
311    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
312    from meerschaum.config import get_config
313    status_config = get_config('formatting', status, patch=True)
314
315    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1]))
316    lines = msg.split('\n')
317    lines = [lines[0]] + [
318        (('    ' + line if not line.startswith(' ') else line))
319        for line in lines[1:]
320    ]
321    if ANSI:
322        lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich'])
323
324    msg = '\n'.join(lines)
325    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
326    return msg
327
328
329def print_options(
330    options: Optional[Iterable[Any]] = None,
331    nopretty: bool = False,
332    no_rich: bool = False,
333    name: str = 'options',
334    header: Optional[str] = None,
335    num_cols: Optional[int] = None,
336    adjust_cols: bool = True,
337    sort_options: bool = False,
338    number_options: bool = False,
339    **kw
340) -> None:
341    """
342    Print items in an iterable as a fancy table.
343
344    Parameters
345    ----------
346    options: Optional[Dict[str, Any]], default None
347        The iterable to be printed.
348
349    nopretty: bool, default False
350        If `True`, don't use fancy formatting.
351
352    no_rich: bool, default False
353        If `True`, don't use `rich` to format the output.
354
355    name: str, default 'options'
356        The text in the default header after `'Available'`.
357
358    header: Optional[str], default None
359        If provided, override `name` and use this as the header text.
360
361    num_cols: Optional[int], default None
362        How many columns in the table. Depends on the terminal size. If `None`, use 8.
363
364    adjust_cols: bool, default True
365        If `True`, adjust the number of columns depending on the terminal size.
366
367    sort_options: bool, default False
368        If `True`, print the options in sorted order.
369
370    number_options: bool, default False
371        If `True`, print the option's number in the list (1 index).
372
373    """
374    from meerschaum.utils.packages import import_rich
375    from meerschaum.utils.formatting import highlight_pipes
376    from meerschaum.utils.misc import get_cols_lines, string_width
377
378    if options is None:
379        options = {}
380    _options = []
381    for o in options:
382        _options.append(str(o))
383    if sort_options:
384        _options = sorted(_options)
385    _header = f"\nAvailable {name}" if header is None else header
386
387    if num_cols is None:
388        num_cols = 8
389
390    def _print_options_no_rich():
391        if not nopretty:
392            print()
393            print(make_header(_header))
394        ### print actions
395        for i, option in enumerate(_options):
396            marker = '-' if not number_options else (str(i + 1) + '.')
397            if not nopretty:
398                print(f"  {marker} ", end="")
399            print(option)
400        if not nopretty:
401            print()
402
403    rich = import_rich()
404    if rich is None or nopretty or no_rich:
405        _print_options_no_rich()
406        return None
407
408    ### Prevent too many options from being truncated on small terminals.
409    if adjust_cols and _options:
410        _cols, _lines = get_cols_lines()
411        while num_cols > 1:
412            cell_len = int(((_cols - 4) - (3 * (num_cols - 1))) / num_cols)
413            num_too_big = sum([(1 if string_width(o) > cell_len else 0) for o in _options])
414            if num_too_big > int(len(_options) / 3):
415                num_cols -= 1
416                continue
417            break
418
419    from meerschaum.utils.packages import attempt_import
420    rich_table = attempt_import('rich.table')
421    Text = attempt_import('rich.text').Text
422    box = attempt_import('rich.box')
423    Table = rich_table.Table
424
425    if _header is not None:
426        table = Table(
427            title=_header,
428            box=box.SIMPLE,
429            show_header=False,
430            show_footer=False,
431            title_style='',
432            expand = True,
433        )
434    else:
435        table = Table.grid(padding=(0, 2))
436    for i in range(num_cols):
437        table.add_column()
438
439    if len(_options) < 12:
440        ### If fewer than 12 items, use a single column
441        for i, option in enumerate(_options):
442            item = highlight_pipes(option)
443            if number_options:
444                item = str(i + 1) + '. ' + item
445            table.add_row(Text.from_ansi(item))
446    else:
447        ### Otherwise, use multiple columns as before
448        num_rows = (len(_options) + num_cols - 1) // num_cols
449        item_ix = 0
450        for i in range(num_rows):
451            row = []
452            for j in range(num_cols):
453                index = i + j * num_rows
454                if index < len(_options):
455                    item = highlight_pipes(_options[index])
456                    if number_options:
457                        item = str(i + 1) + '. ' + item
458                    row.append(Text.from_ansi(item))
459                    item_ix += 1
460                else:
461                    row.append('')
462            table.add_row(*row)
463
464    get_console().print(table)
465    return None
466
467
468def fill_ansi(string: str, style: str = '') -> str:
469    """
470    Fill in non-formatted segments of ANSI text.
471
472    Parameters
473    ----------
474    string: str
475        A string which contains ANSI escape codes.
476
477    style: str
478        Style arguments to pass to `rich.text.Text`.
479
480    Returns
481    -------
482    A string with ANSI styling applied to the segments which don't yet have a style applied.
483    """
484    from meerschaum.utils.packages import import_rich, attempt_import
485    from meerschaum.utils.misc import iterate_chunks
486    _ = import_rich()
487    rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text')
488    Text = rich_text.Text
489    try:
490        msg = Text.from_ansi(string)
491    except AttributeError:
492        import traceback
493        traceback.print_exc()
494        msg = ''
495
496    plain_indices = []
497    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
498        left = left_span.end
499        right = right_span.start if not isinstance(right_span, int) else right_span
500        if left != right:
501            plain_indices.append((left, right))
502    if msg.spans:
503        if msg.spans[0].start != 0:
504            plain_indices = [(0, msg.spans[0].start)] + plain_indices
505        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
506            plain_indices.append((msg.spans[-1].end, len(msg)))
507
508    if plain_indices:
509        for left, right in plain_indices:
510            msg.stylize(style, left, right)
511    else:
512        msg = Text(str(msg), style)
513
514    return rich_text_to_str(msg)
515
516
517def __getattr__(name: str) -> str:
518    """
519    Lazily load module-level variables.
520    """
521    if name.startswith('__') and name.endswith('__'):
522        raise AttributeError("Cannot import dunders from this module.")
523
524    if name in _attrs:
525        if _attrs[name] is not None:
526            return _attrs[name]
527        from meerschaum.config import get_config
528        if name.lower() in get_config('formatting'):
529            _attrs[name] = get_config('formatting', name.lower())
530        elif name == 'CHARSET':
531            _attrs[name] = 'unicode' if __getattr__('UNICODE') else 'ascii'
532        return _attrs[name]
533    
534    if name == '__wrapped__':
535        import sys
536        return sys.modules[__name__]
537    if name == '__all__':
538        return __all__
539
540    try:
541        return globals()[name]
542    except KeyError:
543        raise AttributeError(f"Could not find '{name}'")
ANSI
CHARSET
RESET
UNICODE
def colored( text: str, *colors, as_rich_text: bool = False, **kw) -> Union[str, rich.text.Text]:
137def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'rich.text.Text']:
138    """Apply colors and rich styles to a string.
139    If a `style` keyword is provided, a `rich.text.Text` object will be parsed into a string.
140    Otherwise attempt to use the legacy `more_termcolor.colored` method.
141
142    Parameters
143    ----------
144    text: str
145        The string to apply formatting to.
146        
147    *colors:
148        A list of colors to pass to `more_termcolor.colored()`.
149        Has no effect if `style` is provided.
150
151    style: str, default None
152        If provided, pass to `rich` for processing.
153
154    as_rich_text: bool, default False
155        If `True`, return a `rich.Text` object.
156        `style` must be provided.
157        
158    **kw:
159        Keyword arguments to pass to `rich.text.Text` or `more_termcolor`.
160        
161
162    Returns
163    -------
164    An ANSI-formatted string or a `rich.text.Text` object if `as_rich_text` is `True`.
165
166    """
167    from meerschaum.utils.packages import import_rich, attempt_import
168    global _colorama_init
169    _init()
170    with _locks['_colorama_init']:
171        if not _colorama_init:
172            _colorama_init = _init()
173
174    if 'style' in kw:
175        rich = import_rich()
176        rich_text = attempt_import('rich.text')
177        text_obj = rich_text.Text(text, **kw)
178        if as_rich_text:
179            return text_obj
180        return rich_text_to_str(text_obj)
181
182    more_termcolor = attempt_import('more_termcolor', lazy=False)
183    try:
184        colored_text = more_termcolor.colored(text, *colors, **kw)
185    except Exception as e:
186        colored_text = None
187
188    if colored_text is not None:
189        return colored_text
190
191    try:
192        _colors = translate_rich_to_termcolor(*colors)
193        colored_text = more_termcolor.colored(text, *_colors, **kw)
194    except Exception as e:
195        colored_text = None
196
197    if colored_text is None:
198        ### NOTE: warn here?
199        return text
200
201    return colored_text

Apply colors and rich styles to a string. If a style keyword is provided, a rich.text.Text object will be parsed into a string. Otherwise attempt to use the legacy more_termcolor.colored method.

Parameters
  • text (str): The string to apply formatting to.
  • *colors:: A list of colors to pass to more_termcolor.colored(). Has no effect if style is provided.
  • style (str, default None): If provided, pass to rich for processing.
  • as_rich_text (bool, default False): If True, return a rich.Text object. style must be provided.
  • **kw:: Keyword arguments to pass to rich.text.Text or more_termcolor.
Returns
  • An ANSI-formatted string or a rich.text.Text object if as_rich_text is True.
def extract_stats_from_message(message: str, stat_keys: Optional[List[str]] = None) -> Dict[str, int]:
487def extract_stats_from_message(
488    message: str,
489    stat_keys: Optional[List[str]] = None,
490) -> Dict[str, int]:
491    """
492    Given a sync message, return the insert, update, upsert stats from within.
493
494    Parameters
495    ----------
496    message: str
497        The message to parse for statistics.
498
499    stat_keys: Optional[List[str]], default None
500        If provided, search for these words (case insensitive) in the message.
501        Defaults to `['inserted', 'updated', 'upserted']`.
502
503    Returns
504    -------
505    A dictionary mapping the stat keys to the total number of rows affected.
506    """
507    stat_keys = stat_keys or ['inserted', 'updated', 'upserted', 'checked']
508    lines_stats = [extract_stats_from_line(line, stat_keys) for line in message.split('\n')]
509    message_stats = {
510        stat_key: sum(stats.get(stat_key, 0) for stats in lines_stats)
511        for stat_key in stat_keys
512    }
513    return message_stats

Given a sync message, return the insert, update, upsert stats from within.

Parameters
  • message (str): The message to parse for statistics.
  • stat_keys (Optional[List[str]], default None): If provided, search for these words (case insensitive) in the message. Defaults to ['inserted', 'updated', 'upserted'].
Returns
  • A dictionary mapping the stat keys to the total number of rows affected.
def fill_ansi(string: str, style: str = '') -> str:
469def fill_ansi(string: str, style: str = '') -> str:
470    """
471    Fill in non-formatted segments of ANSI text.
472
473    Parameters
474    ----------
475    string: str
476        A string which contains ANSI escape codes.
477
478    style: str
479        Style arguments to pass to `rich.text.Text`.
480
481    Returns
482    -------
483    A string with ANSI styling applied to the segments which don't yet have a style applied.
484    """
485    from meerschaum.utils.packages import import_rich, attempt_import
486    from meerschaum.utils.misc import iterate_chunks
487    _ = import_rich()
488    rich_ansi, rich_text = attempt_import('rich.ansi', 'rich.text')
489    Text = rich_text.Text
490    try:
491        msg = Text.from_ansi(string)
492    except AttributeError:
493        import traceback
494        traceback.print_exc()
495        msg = ''
496
497    plain_indices = []
498    for left_span, right_span in iterate_chunks(msg.spans, 2, fillvalue=len(msg)):
499        left = left_span.end
500        right = right_span.start if not isinstance(right_span, int) else right_span
501        if left != right:
502            plain_indices.append((left, right))
503    if msg.spans:
504        if msg.spans[0].start != 0:
505            plain_indices = [(0, msg.spans[0].start)] + plain_indices
506        if plain_indices and msg.spans[-1].end != len(msg) and plain_indices[-1][1] != len(msg):
507            plain_indices.append((msg.spans[-1].end, len(msg)))
508
509    if plain_indices:
510        for left, right in plain_indices:
511            msg.stylize(style, left, right)
512    else:
513        msg = Text(str(msg), style)
514
515    return rich_text_to_str(msg)

Fill in non-formatted segments of ANSI text.

Parameters
  • string (str): A string which contains ANSI escape codes.
  • style (str): Style arguments to pass to rich.text.Text.
Returns
  • A string with ANSI styling applied to the segments which don't yet have a style applied.
def format_success_tuple( tup: Tuple[bool, str], upper_padding: int = 0, lower_padding: int = 0, left_padding: int = 1, calm: bool = False, _progress: Optional[rich.progress.Progress] = None) -> str:
277def format_success_tuple(
278    tup: mrsm.SuccessTuple,
279    upper_padding: int = 0,
280    lower_padding: int = 0,
281    left_padding: int = 1,
282    calm: bool = False,
283    _progress: Optional['rich.progress.Progress'] = None,
284) -> str:
285    """
286    Format `meerschaum.utils.typing.SuccessTuple`.
287
288    Parameters
289    ----------
290    upper_padding: int, default 0
291        How many newlines to prepend to the message.
292
293    lower_padding: int, default 0
294        How many newlines to append to the message.
295
296    left_padding: int, default 1
297        How mant spaces to preprend to the message.
298
299    calm: bool, default False
300        If `True`, use the default emoji and color scheme.
301    """
302    _init()
303    try:
304        status = 'success' if tup[0] else 'failure'
305    except TypeError:
306        status = 'failure'
307        tup = None, None
308
309    if calm:
310        status += '_calm'
311
312    ANSI, CHARSET = __getattr__('ANSI'), __getattr__('CHARSET')
313    from meerschaum.config import get_config
314    status_config = get_config('formatting', status, patch=True)
315
316    msg = (' ' * left_padding) + status_config[CHARSET]['icon'] + ' ' + str(highlight_pipes(tup[1]))
317    lines = msg.split('\n')
318    lines = [lines[0]] + [
319        (('    ' + line if not line.startswith(' ') else line))
320        for line in lines[1:]
321    ]
322    if ANSI:
323        lines[0] = fill_ansi(lines[0], **status_config['ansi']['rich'])
324
325    msg = '\n'.join(lines)
326    msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
327    return msg

Format meerschaum.utils.typing.SuccessTuple.

Parameters
  • upper_padding (int, default 0): How many newlines to prepend to the message.
  • lower_padding (int, default 0): How many newlines to append to the message.
  • left_padding (int, default 1): How mant spaces to preprend to the message.
  • calm (bool, default False): If True, use the default emoji and color scheme.
def get_console():
204def get_console():
205    """Get the rich console."""
206    global console
207    from meerschaum.utils.packages import import_rich, attempt_import
208    rich = import_rich()
209    rich_console = attempt_import('rich.console')
210    try:
211        console = rich_console.Console(force_terminal=True, color_system='truecolor')
212    except Exception:
213        import traceback
214        traceback.print_exc()
215        console = None
216    return console

Get the rich console.

def highlight_pipes(message: str) -> str:
345def highlight_pipes(message: str) -> str:
346    """
347    Add syntax highlighting to an info message containing stringified `meerschaum.Pipe` objects.
348    """
349    if 'Pipe(' not in message:
350        return message
351
352    from meerschaum.utils.misc import parse_arguments_str
353    segments = message.split('Pipe(')
354    msg = ''
355    for i, segment in enumerate(segments):
356        if i == 0:
357            msg += segment
358            continue
359
360        paren_index = segment.find(')')
361        if paren_index == -1:
362            msg += 'Pipe(' + segment
363            continue
364        
365        pipe_args_str = segment[:paren_index]
366        try:
367            args, kwargs = parse_arguments_str(pipe_args_str)
368            pipe_dict = {
369                'connector_keys': args[0],
370                'metric_key': args[1],
371            }
372            if len(args) > 2:
373                pipe_dict['location_key'] = args[2]
374            if 'instance' in kwargs:
375                pipe_dict['instance_keys'] = kwargs['instance']
376            
377            _to_add = pipe_repr(pipe_dict) + segment[paren_index + 1:]
378        except Exception:
379            _to_add = 'Pipe(' + segment
380        msg += _to_add
381    return msg

Add syntax highlighting to an info message containing stringified meerschaum.Pipe objects.

def make_header(message: str, ruler: str = '─', left_pad: int = 2) -> str:
17def make_header(message: str, ruler: str = '─', left_pad: int = 2) -> str:
18    """Format a message string with a ruler.
19    Length of the ruler is the length of the longest word.
20    
21    Example:
22        'My\nheader' -> '  My\n  header\n  ──────'
23    """
24
25    from meerschaum.utils.formatting import ANSI, UNICODE, colored
26    if not UNICODE:
27        ruler = '-'
28    words = message.split('\n')
29    max_length = 0
30    for w in words:
31        length = len(w)
32        if length > max_length:
33            max_length = length
34
35    left_buffer = left_pad * ' '
36
37    return (
38        left_buffer
39        + message.replace('\n', '\n' + left_buffer)
40        + "\n"
41        + left_buffer
42        + (ruler * max_length)
43    )

Format a message string with a ruler. Length of the ruler is the length of the longest word.

Example:
    'My

header' -> ' My header ──────'

def pipe_repr( pipe: Union[meerschaum.Pipe, Dict[str, Any]], as_rich_text: bool = False, ansi: Optional[bool] = None) -> Union[str, rich.text.Text]:
280def pipe_repr(
281    pipe: Union[mrsm.Pipe, Dict[str, Any]],
282    as_rich_text: bool = False,
283    ansi: Optional[bool] = None,
284) -> Union[str, 'rich.text.Text']:
285    """
286    Return a formatted string for representing a `meerschaum.Pipe`.
287    """
288    from meerschaum.utils.formatting import ANSI, colored, rich_text_to_str
289    from meerschaum.utils.packages import import_rich, attempt_import
290    import meerschaum as mrsm
291
292    _ = import_rich()
293    Text = attempt_import('rich.text').Text
294
295    if isinstance(pipe, mrsm.Pipe):
296        connector_keys = pipe.connector_keys
297        metric_key = pipe.metric_key
298        location_key = pipe.location_key
299        instance_keys = pipe.instance_keys
300    else:
301        connector_keys = pipe.get('connector_keys')
302        metric_key = pipe.get('metric_key')
303        location_key = pipe.get('location_key')
304        instance_keys = pipe.get('instance_keys', get_config('meerschaum', 'instance'))
305
306    styles = get_config('formatting', 'pipes', '__repr__', 'ansi', 'styles')
307    if not ANSI or (ansi is False):
308        styles = {k: '' for k in styles}
309    _pipe_style_prefix, _pipe_style_suffix = (
310        (("[" + styles['Pipe'] + "]"), ("[/" + styles['Pipe'] + "]")) if styles['Pipe']
311        else ('', '')
312    )
313    text_obj = (
314        Text.from_markup(_pipe_style_prefix + "Pipe(" + _pipe_style_suffix)
315        + colored(("'" + connector_keys + "'"), style=styles['connector'], as_rich_text=True)
316        + Text.from_markup(_pipe_style_prefix + ", " + _pipe_style_suffix)
317        + colored(("'" + metric_key + "'"), style=styles['metric'], as_rich_text=True)
318        + (
319            (
320                colored(', ', style=styles['punctuation'], as_rich_text=True)
321                + colored(
322                    ("'" + location_key + "'"),
323                    style=styles['location'], as_rich_text=True
324                )
325            ) if location_key is not None
326            else colored('', style='', as_rich_text=True)
327        ) + (
328            ( ### Add the `instance=` argument.
329                colored(', instance=', style=styles['punctuation'], as_rich_text=True)
330                + colored(
331                    ("'" + instance_keys + "'"),
332                    style=styles['instance'], as_rich_text=True
333                )
334            ) if instance_keys != get_config('meerschaum', 'instance')
335            else colored('', style='', as_rich_text=True)
336        )
337        + Text.from_markup(_pipe_style_prefix + ")" + _pipe_style_suffix)
338    )
339    if as_rich_text:
340        return text_obj
341    return rich_text_to_str(text_obj).replace('\n', '')

Return a formatted string for representing a meerschaum.Pipe.

def pprint( *args, detect_password: bool = True, nopretty: bool = False, **kw) -> None:
 10def pprint(
 11    *args,
 12    detect_password: bool = True,
 13    nopretty: bool = False,
 14    **kw
 15) -> None:
 16    """Pretty print an object according to the configured ANSI and UNICODE settings.
 17    If detect_password is True (default), search and replace passwords with '*' characters.
 18    Does not mutate objects.
 19    """
 20    import copy
 21    import json
 22    from meerschaum.utils.packages import attempt_import, import_rich
 23    from meerschaum.utils.formatting import ANSI, get_console, print_tuple
 24    from meerschaum.utils.warnings import error
 25    from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords
 26    from collections import OrderedDict
 27
 28    if (
 29        len(args) == 1
 30        and
 31        isinstance(args[0], tuple)
 32        and
 33        len(args[0]) == 2
 34        and
 35        isinstance(args[0][0], bool)
 36        and
 37        isinstance(args[0][1], str)
 38    ):
 39        return print_tuple(args[0], **filter_keywords(print_tuple, **kw))
 40
 41    modify = True
 42    rich_pprint = None
 43    if ANSI and not nopretty:
 44        rich = import_rich()
 45        if rich is not None:
 46            rich_pretty = attempt_import('rich.pretty')
 47        if rich_pretty is not None:
 48            def _rich_pprint(*args, **kw):
 49                _console = get_console()
 50                _kw = filter_keywords(_console.print, **kw)
 51                _console.print(*args, **_kw)
 52            rich_pprint = _rich_pprint
 53    elif not nopretty:
 54        pprintpp = attempt_import('pprintpp', warn=False)
 55        try:
 56            _pprint = pprintpp.pprint
 57        except Exception :
 58            import pprint as _pprint_module
 59            _pprint = _pprint_module.pprint
 60
 61    func = (
 62        _pprint if rich_pprint is None else rich_pprint
 63    ) if not nopretty else print
 64
 65    try:
 66        args_copy = copy.deepcopy(args)
 67    except Exception:
 68        args_copy = args
 69        modify = False
 70
 71    _args = []
 72    for a in args:
 73        c = a
 74        ### convert OrderedDict into dict
 75        if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict):
 76            c = dict_from_od(copy.deepcopy(c))
 77        _args.append(c)
 78    args = _args
 79
 80    _args = list(args)
 81    if detect_password and modify:
 82        _args = []
 83        for a in args:
 84            c = a
 85            if isinstance(c, dict):
 86                c = replace_password(copy.deepcopy(c))
 87            if nopretty:
 88                try:
 89                    c = json.dumps(c)
 90                    is_json = True
 91                except Exception:
 92                    is_json = False
 93                if not is_json:
 94                    try:
 95                        c = str(c)
 96                    except Exception:
 97                        pass
 98            _args.append(c)
 99
100    ### filter out unsupported keywords
101    func_kw = filter_keywords(func, **kw) if not nopretty else {}
102    error_msg = None
103    try:
104        func(*_args, **func_kw)
105    except Exception as e:
106        error_msg = e
107    if error_msg is not None:
108        error(error_msg)

Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.

def pprint_pipes( pipes: Dict[str, Dict[str, Dict[str, meerschaum.Pipe]]]) -> None:
 17def pprint_pipes(pipes: PipesDict) -> None:
 18    """Print a stylized tree of a Pipes dictionary.
 19    Supports ANSI and UNICODE global settings."""
 20    from meerschaum.utils.warnings import error
 21    from meerschaum.utils.packages import attempt_import, import_rich
 22    from meerschaum.utils.misc import sorted_dict, replace_pipes_in_dict
 23    from meerschaum.utils.formatting import UNICODE, ANSI, CHARSET, pprint, colored, get_console
 24    import copy
 25    rich = import_rich('rich', warn=False)
 26    Text = None
 27    if rich is not None:
 28        rich_text = attempt_import('rich.text', lazy=False)
 29        Text = rich_text.Text
 30
 31    icons = get_config('formatting', 'pipes', CHARSET, 'icons')
 32    styles = get_config('formatting', 'pipes', 'ansi', 'styles')
 33    if not ANSI:
 34        styles = {k: '' for k in styles}
 35    print()
 36
 37    def ascii_print_pipes():
 38        """Print the dictionary with no unicode allowed. Also works in case rich fails to import
 39        (though rich should auto-install when `attempt_import()` is called)."""
 40        asciitree = attempt_import('asciitree')
 41        ascii_dict, replace_dict = {}, {'connector': {}, 'metric': {}, 'location': {}}
 42        for conn_keys, metrics in pipes.items():
 43            _colored_conn_key = colored(icons['connector'] + conn_keys, style=styles['connector'])
 44            if Text is not None:
 45                replace_dict['connector'][_colored_conn_key] = (
 46                    Text(conn_keys, style=styles['connector'])
 47                )
 48            ascii_dict[_colored_conn_key] = {}
 49            for metric, locations in metrics.items():
 50                _colored_metric_key = colored(icons['metric'] + metric, style=styles['metric'])
 51                if Text is not None:
 52                    replace_dict['metric'][_colored_metric_key] = (
 53                        Text(metric, style=styles['metric'])
 54                    )
 55                ascii_dict[_colored_conn_key][_colored_metric_key] = {}
 56                for location, pipe in locations.items():
 57                    _location_style = styles[('none' if location is None else 'location')]
 58                    pipe_addendum = '\n         ' + pipe.__repr__() + '\n'
 59                    _colored_location = colored(
 60                        icons['location'] + str(location), style=_location_style
 61                    )
 62                    _colored_location_key = _colored_location + pipe_addendum
 63                    if Text is not None:
 64                        replace_dict['location'][_colored_location] = (
 65                            Text(str(location), style=_location_style)
 66                        )
 67                    ascii_dict[_colored_conn_key][_colored_metric_key][_colored_location_key] = {}
 68
 69        tree = asciitree.LeftAligned()
 70        output = ''
 71        cols = []
 72
 73        ### This is pretty terrible, unreadable code.
 74        ### Please know that I'm normally better than this.
 75        key_str = (
 76            (Text("     ") if Text is not None else "     ") +
 77            (
 78                Text("Key", style='underline') if Text is not None else
 79                colored("Key", style='underline')
 80            ) + (Text('\n\n  ') if Text is not None else '\n\n  ') +
 81            (
 82                Text("Connector", style=styles['connector']) if Text is not None else
 83                colored("Connector", style=styles['connector'])
 84            ) + (Text('\n   +-- ') if Text is not None else '\n   +-- ') +
 85            (
 86                Text("Metric", style=styles['metric']) if Text is not None else
 87                colored("Metric", style=styles['metric'])
 88            ) + (Text('\n       +-- ') if Text is not None else '\n       +-- ') +
 89            (
 90                Text("Location", style=styles['location']) if Text is not None else
 91                colored("Location", style=styles['location'])
 92            ) + (Text('\n\n') if Text is not None else '\n\n')
 93        )
 94
 95        output += str(key_str)
 96        cols.append(key_str)
 97
 98        def replace_tree_text(tree_str : str) -> Text:
 99            """Replace the colored words with stylized Text instead.
100            Is not executed if ANSI and UNICODE are disabled."""
101            tree_text = Text(tree_str) if Text is not None else None
102            for k, v in replace_dict.items():
103                for _colored, _text in v.items():
104                    parts = []
105                    lines = tree_text.split(_colored)
106                    for part in lines:
107                        parts += [part, _text]
108                    if lines[-1] != Text(''):
109                        parts = parts[:-1]
110                    _tree_text = Text('')
111                    for part in parts:
112                        _tree_text += part
113                    tree_text = _tree_text
114            return tree_text
115
116        tree_output = ""
117        for k, v in ascii_dict.items():
118            branch = {k : v}
119            tree_output += tree(branch) + '\n\n'
120            if not UNICODE and not ANSI:
121                _col = (Text(tree(branch)) if Text is not None else tree(branch))
122            else:
123                _col = replace_tree_text(tree(branch))
124            cols.append(_col)
125        if len(output) > 0:
126            tree_output = tree_output[:-2]
127        output += tree_output
128
129        if rich is None:
130            return print(output)
131
132        rich_columns = attempt_import('rich.columns')
133        Columns = rich_columns.Columns
134        columns = Columns(cols)
135        get_console().print(columns)
136
137    if not UNICODE:
138        return ascii_print_pipes()
139
140    rich_panel, rich_tree, rich_text, rich_columns, rich_table = attempt_import(
141        'rich.panel',
142        'rich.tree',
143        'rich.text',
144        'rich.columns',
145        'rich.table',
146    )
147    from rich import box
148    Panel = rich_panel.Panel
149    Tree = rich_tree.Tree
150    Text = rich_text.Text
151    Columns = rich_columns.Columns
152    Table = rich_table.Table
153
154    key_panel = Panel(
155        (
156            Text("\n") +
157            Text(icons['connector'] + "Connector", style=styles['connector']) + Text("\n\n") +
158            Text(icons['metric'] + "Metric", style=styles['metric']) + Text("\n\n") +
159            Text(icons['location'] + "Location", style=styles['location']) + Text("\n")
160        ),
161        title = Text(icons['key'] + "Keys", style=styles['guide']),
162        border_style = styles['guide'],
163        expand = True
164    )
165
166    cols = []
167    conn_trees = {}
168    metric_trees = {}
169    pipes = sorted_dict(pipes)
170    for conn_keys, metrics in pipes.items():
171        conn_trees[conn_keys] = Tree(
172            Text(
173                icons['connector'] + conn_keys,
174                style = styles['connector'],
175            ),
176            guide_style = styles['connector']
177        )
178        metric_trees[conn_keys] = {}
179        for metric, locations in metrics.items():
180            metric_trees[conn_keys][metric] = Tree(
181                Text(
182                    icons['metric'] + metric,
183                    style = styles['metric']
184                ),
185                guide_style = styles['metric']
186            )
187            conn_trees[conn_keys].add(metric_trees[conn_keys][metric])
188            for location, pipe in locations.items():
189                _location = (
190                    Text(str(location), style=styles['none']) if location is None
191                    else Text(location, style=styles['location'])
192                )
193                _location = (
194                    Text(icons['location'])
195                    + _location + Text('\n')
196                    + pipe_repr(pipe, as_rich_text=True) + Text('\n')
197                )
198                metric_trees[conn_keys][metric].add(_location)
199
200    cols += [key_panel]
201    for k, t in conn_trees.items():
202        cols.append(t)
203
204    columns = Columns(cols)
205    get_console().print(columns)

Print a stylized tree of a Pipes dictionary. Supports ANSI and UNICODE global settings.

def translate_rich_to_termcolor(*colors) -> tuple:
60def translate_rich_to_termcolor(*colors) -> tuple:
61    """Translate between rich and more_termcolor terminology."""
62    _colors = []
63    for c in colors:
64        _c_list = []
65        ### handle 'bright'
66        c = c.replace('bright_', 'bright ')
67
68        ### handle 'on'
69        if ' on ' in c:
70            _on = c.split(' on ')
71            _colors.append(_on[0])
72            for _c in _on[1:]:
73                _c_list.append('on ' + _c)
74        else:
75            _c_list += [c]
76
77        _colors += _c_list
78
79    return tuple(_colors)

Translate between rich and more_termcolor terminology.